Skip to content

Commit

Permalink
Adding e2e tests for block-cache in pipeline (#1325)
Browse files Browse the repository at this point in the history
* Adding e2e tests for block-cache in pipeline
  • Loading branch information
vibhansa-msft committed Feb 27, 2024
1 parent d70a721 commit bd7dffc
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 127 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
@@ -1,6 +1,10 @@
## 2.2.1 (Unreleased)
**Bug Fixes**

- Fixed panic while truncating a file to a very large size.
- Fixed block-cache panic on flush of a file which has no active changeset
- Fixed block-cache panic on renaming a file and then flushing older handle
- Fixed block-cache flush resulting in invalid-block-list error

**Features**

## 2.2.0 (2024-01-24)
Expand Down
20 changes: 19 additions & 1 deletion azure-pipeline-templates/verbose-tests.yml
Expand Up @@ -207,6 +207,25 @@ steps:
displayName: 'E2E Test: Mount with Key Credential Configuration'
timeoutInMinutes: 3
continueOnError: false
#--------------------------------------- Tests: End to end tests with Block Cache configurations ------------------------------------------
- template: e2e-tests-spcl.yml
parameters:
conf_template: azure_block_perf.yaml
config_file: ${{ parameters.config }}
container: ${{ parameters.container }}
temp_dir: ${{ parameters.temp_dir }}
mount_dir: ${{ parameters.mount_dir }}
adls: ${{ parameters.adls }}
account_name: ${{ parameters.account_name }}
account_key: ${{ parameters.account_key }}
account_type: ${{ parameters.account_type }}
account_endpoint: ${{ parameters.account_endpoint }}
idstring: "${{ parameters.service }} with Block-cache"
distro_name: ${{ parameters.distro_name }}
quick_test: ${{ parameters.quick_test }}
verbose_log: ${{ parameters.verbose_log }}
clone: false
stream_direct_test: false

- ${{ if eq(parameters.test_stream, true) }}:
- template: e2e-tests.yml
Expand Down Expand Up @@ -402,7 +421,6 @@ steps:
verbose_log: ${{ parameters.verbose_log }}
enable_symlink_adls: true


#--------------------------------------- Setup: End to end tests with different File Cache configurations ------------------------------------------
- script: |
cd ${{ parameters.working_dir }}
Expand Down
76 changes: 0 additions & 76 deletions blobfuse2-nightly.yaml
Expand Up @@ -1048,82 +1048,6 @@ stages:
displayName: 'Install fuse'
verbose_log: ${{ parameters.verbose_log }}

# Debian Tests
# - job: Set_8
# timeoutInMinutes: 60
# strategy:
# matrix:
# Debian-10.0:
# DistroVer: "Debian10.0"
# Description: "Debian 10"
# AgentName: "blobfuse-debian10"
# ContainerName: "test-cnt-deb-10"
# fuselib: 'fuse libfuse-dev'
# tags: 'fuse2'
# Debian-11.0:
# DistroVer: "Debian11.0"
# Description: "Debian 11"
# AgentName: "blobfuse-debian11"
# ContainerName: "test-cnt-deb-11"
# fuselib: 'fuse3 libfuse3-dev'
# tags: 'fuse3'

# pool:
# name: "blobfuse-debian-pool"
# demands:
# - ImageOverride -equals $(AgentName)

# variables:
# - group: NightlyBlobFuse
# - name: ROOT_DIR
# value: "/usr/pipeline/workv2"
# - name: WORK_DIR
# value: "/usr/pipeline/workv2/go/src/azure-storage-fuse"
# - name: skipComponentGovernanceDetection
# value: true
# - name: MOUNT_DIR
# value: "/usr/pipeline/workv2/blob_mnt"
# - name: TEMP_DIR
# value: "/usr/pipeline/workv2/temp"
# - name: BLOBFUSE2_CFG
# value: "/usr/pipeline/workv2/blobfuse2.yaml"
# - name: BLOBFUSE2_ADLS_CFG
# value: "/home/vsts/workv2/blobfuse2.adls.yaml"
# - name: GOPATH
# value: "/usr/pipeline/workv2/go"

# steps:
# # Go tool installer
# - task: GoTool@0
# inputs:
# version: '1.20.5'
# displayName: "Install Go Version"

# - template: 'azure-pipeline-templates/distro-tests.yml'
# parameters:
# working_dir: $(WORK_DIR)
# root_dir: $(ROOT_DIR)
# temp_dir: $(TEMP_DIR)
# mount_dir: $(MOUNT_DIR)
# config_path: $(BLOBFUSE2_CFG)
# container: $(ContainerName)
# blob_account_name: $(NIGHTLY_STO_BLOB_ACC_NAME)
# blob_account_key: $(NIGHTLY_STO_BLOB_ACC_KEY)
# adls_account_name: $(AZTEST_ADLS_ACC_NAME)
# adls_account_key: $(AZTEST_ADLS_KEY)
# distro_name: $(AgentName)
# tags: $(tags)
# fuselib: $(fuselib)
# gopath: $(GOPATH)
# installStep:
# script: |
# sudo rm /etc/apt/sources.list.d/azure.list
# sudo apt-get update --fix-missing -y
# sudo apt-get install $(fuselib) -y
# sudo apt-get install build-essential git python3 -y
# displayName: 'Install fuse'
# verbose_log: ${{ parameters.verbose_log }}

# SUSE Tests
- job: Set_9
timeoutInMinutes: 60
Expand Down
1 change: 1 addition & 0 deletions common/types.go
Expand Up @@ -64,6 +64,7 @@ const (
DefaultAllowOtherPermissionBits os.FileMode = 0777

MbToBytes = 1024 * 1024
GbToBytes = 1024 * MbToBytes
BfuseStats = "blobfuse_stats"

FuseAllowedFlags = "invalid FUSE options. Allowed FUSE configurations are: `-o attr_timeout=TIMEOUT`, `-o negative_timeout=TIMEOUT`, `-o entry_timeout=TIMEOUT` `-o allow_other`, `-o allow_root`, `-o umask=PERMISSIONS -o default_permissions`, `-o ro`"
Expand Down
8 changes: 6 additions & 2 deletions component/azstorage/azauthmsi.go
Expand Up @@ -174,7 +174,8 @@ func (azmsi *azAuthBlobMSI) getCredential() interface{} {
norefresh := false

msi_endpoint := os.Getenv("MSI_ENDPOINT")
if strings.Contains(msi_endpoint, "127.0.0.1:") {
if strings.Contains(msi_endpoint, "127.0.0.1:") || strings.Contains(msi_endpoint, "localhost:") ||
strings.Contains(azmsi.config.ActiveDirectoryEndpoint, "127.0.0.1:") {
// this might be AML workspace so try to get token using CLI
log.Info("azAuthBlobMSI::getCredential : Potential AML workspace detected")
token, err = azmsi.fetchTokenFromCLI()
Expand Down Expand Up @@ -273,7 +274,10 @@ func (azmsi *azAuthBfsMSI) getCredential() interface{} {
norefresh := false

msi_endpoint := os.Getenv("MSI_ENDPOINT")
if strings.Contains(msi_endpoint, "127.0.0.1:") {
log.Info("azAuthBfsMSI::getCredential : MSI_ENDPOINT = %v", msi_endpoint)

if strings.Contains(msi_endpoint, "127.0.0.1:") || strings.Contains(msi_endpoint, "localhost:") ||
strings.Contains(azmsi.config.ActiveDirectoryEndpoint, "127.0.0.1:") {
// this might be AML workspace so try to get token using CLI
log.Info("azAuthBfsMSI::getCredential : Potential AML workspace detected")
token, err = azmsi.fetchTokenFromCLI()
Expand Down
47 changes: 43 additions & 4 deletions component/azstorage/block_blob.go
Expand Up @@ -1073,11 +1073,50 @@ func (bb *BlockBlob) TruncateFile(name string, size int64) error {
return err
}
}
//TODO: the resize might be very big - need to allocate in chunks
if size == 0 || attr.Size == 0 {
err := bb.WriteFromBuffer(name, nil, make([]byte, size))
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to set the %s to 0 bytes [%s]", name, err.Error())
// If we are resizing to a value > 1GB then we need to upload multiple blocks to resize
if size > 1*common.GbToBytes {
blkSize := int64(16 * common.MbToBytes)
blobName := filepath.Join(bb.Config.prefixPath, name)
blobURL := bb.Container.NewBlockBlobURL(blobName)

blkList := make([]string, 0)
id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))

for i := 0; size > 0; i++ {
if i == 0 || size < blkSize {
// Only first and last block we upload and rest all we replicate with the first block itself
if size < blkSize {
blkSize = size
id = base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
}
data := make([]byte, blkSize)

_, err = blobURL.StageBlock(context.Background(),
id,
bytes.NewReader(data),
bb.blobAccCond.LeaseAccessConditions,
nil,
bb.downloadOptions.ClientProvidedKeyOptions)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to stage block for %s [%s]", name, err.Error())
return err
}
}
blkList = append(blkList, id)
size -= blkSize
}

err = bb.CommitBlocks(blobName, blkList)
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to commit blocks for %s [%s]", name, err.Error())
return err
}
} else {
err := bb.WriteFromBuffer(name, nil, make([]byte, size))
if err != nil {
log.Err("BlockBlob::TruncateFile : Failed to set the %s to 0 bytes [%s]", name, err.Error())
}
}
return err
}
Expand Down
20 changes: 20 additions & 0 deletions component/azstorage/block_blob_test.go
Expand Up @@ -1292,6 +1292,26 @@ func (s *blockBlobTestSuite) TestTruncateSmallFileSmaller() {
s.assert.EqualValues(testData[:truncatedLength], output[:])
}

func (s *blockBlobTestSuite) TestTruncateEmptyFileToLargeSize() {
defer s.cleanupTest()
// Setup
name := generateFileName()
h, _ := s.az.CreateFile(internal.CreateFileOptions{Name: name})
s.assert.NotNil(h)

blobSize := int64((1 * common.GbToBytes) + 13)
err := s.az.TruncateFile(internal.TruncateFileOptions{Name: name, Size: blobSize})
s.assert.Nil(err)

props, err := s.az.GetAttr(internal.GetAttrOptions{Name: name})
s.assert.Nil(err)
s.assert.NotNil(props)
s.assert.EqualValues(blobSize, props.Size)

err = s.az.DeleteFile(internal.DeleteFileOptions{Name: name})
s.assert.Nil(err)
}

func (s *blockBlobTestSuite) TestTruncateChunkedFileSmaller() {
defer s.cleanupTest()
// Setup
Expand Down
28 changes: 16 additions & 12 deletions component/block_cache/block_cache.go
Expand Up @@ -293,7 +293,9 @@ func (bc *BlockCache) CreateFile(options internal.CreateFileOptions) (*handlemap
handle.Size = 0
handle.Mtime = time.Now()

handle.Flags.Set(handlemap.HandleFlagDirty)
// As file is created on storage as well there is no need to mark this as dirty
// Any write operation to file will mark it dirty and flush will then reupload
// handle.Flags.Set(handlemap.HandleFlagDirty)
bc.prepareHandleForBlockCache(handle)
return handle, nil
}
Expand All @@ -318,14 +320,14 @@ func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Han
// If file is opened in truncate or wronly mode then we need to wipe out the data consider current file size as 0
handle.Size = 0
handle.Flags.Set(handlemap.HandleFlagDirty)
} else if options.Flags&os.O_RDWR != 0 {
} else if options.Flags&os.O_RDWR != 0 && handle.Size != 0 {
// File is not opened in read-only mode so we need to get the list of blocks and validate the size
// As there can be a potential write on this file, currently configured block size and block size of the file in container
// has to match otherwise it will corrupt the file. Fail the open call if this is not the case.
blockList, err := bc.NextComponent().GetCommittedBlockList(options.Name)
if err != nil || blockList == nil {
log.Err("BlockCache::OpenFile : Failed to get block list of %s [%s]", options.Name, err.Error())
return nil, err
log.Err("BlockCache::OpenFile : Failed to get block list of %s [%v]", options.Name, err)
return nil, fmt.Errorf("failed to retrieve block list for %s", options.Name)
}

lst, _ := handle.GetValue("blockList")
Expand All @@ -343,13 +345,15 @@ func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Han
}
}

// This shall be done after the refresh only as this will populate the queues created by above method
if handle.Size < int64(bc.blockSize) {
// File is small and can fit in one block itself
_ = bc.refreshBlock(handle, 0, false)
} else if bc.prefetchOnOpen && !bc.noPrefetch {
// Prefetch to start on open
_ = bc.startPrefetch(handle, 0, false)
if handle.Size > 0 {
// This shall be done after the refresh only as this will populate the queues created by above method
if handle.Size < int64(bc.blockSize) {
// File is small and can fit in one block itself
_ = bc.refreshBlock(handle, 0, false)
} else if bc.prefetchOnOpen && !bc.noPrefetch {
// Prefetch to start on open
_ = bc.startPrefetch(handle, 0, false)
}
}

return handle, nil
Expand Down Expand Up @@ -385,7 +389,6 @@ func (bc *BlockCache) FlushFile(options internal.FlushFileOptions) error {
return err
}

options.Handle.Flags.Clear(handlemap.HandleFlagDirty)
return nil
}

Expand Down Expand Up @@ -1191,6 +1194,7 @@ func (bc *BlockCache) commitBlocks(handle *handlemap.Handle) error {
return err
}

handle.Flags.Clear(handlemap.HandleFlagDirty)
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions component/block_cache/block_cache_test.go
Expand Up @@ -514,7 +514,7 @@ func (suite *blockCacheTestSuite) TestCreateFile() {
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.True(h.Dirty())
suite.assert.False(h.Dirty())

stroagePath := filepath.Join(tobj.fake_storage_path, path)
fs, err := os.Stat(stroagePath)
Expand Down Expand Up @@ -575,7 +575,7 @@ func (suite *blockCacheTestSuite) TestWriteFileSimple() {
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.True(h.Dirty())
suite.assert.False(h.Dirty())

stroagePath := filepath.Join(tobj.fake_storage_path, path)
fs, err := os.Stat(stroagePath)
Expand Down Expand Up @@ -643,7 +643,7 @@ func (suite *blockCacheTestSuite) TestWriteFileMultiBlock() {
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.True(h.Dirty())
suite.assert.False(h.Dirty())

n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data}) // 5 bytes
suite.assert.Nil(err)
Expand Down Expand Up @@ -682,7 +682,7 @@ func (suite *blockCacheTestSuite) TestWriteFileMultiBlockWithOverwrite() {
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.True(h.Dirty())
suite.assert.False(h.Dirty())

n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data}) // 5 bytes
suite.assert.Nil(err)
Expand Down Expand Up @@ -733,7 +733,7 @@ func (suite *blockCacheTestSuite) TestWritefileWithAppend() {
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.True(h.Dirty())
suite.assert.False(h.Dirty())

n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data}) // 5 bytes
suite.assert.Nil(err)
Expand Down Expand Up @@ -830,7 +830,7 @@ func (suite *blockCacheTestSuite) TestDeleteAndRenameDirAndFile() {
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.True(h.Dirty())
suite.assert.False(h.Dirty())

n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: []byte("Hello")}) // 5 bytes
suite.assert.Nil(err)
Expand Down
2 changes: 1 addition & 1 deletion component/libfuse/libfuse2_handler.go
Expand Up @@ -632,7 +632,7 @@ func libfuse_open(path *C.char, fi *C.fuse_file_info_t) C.int {
// TODO: Should this sit behind a user option? What if we change something to support these in the future?
// Mask out SYNC and DIRECT flags since write operation will fail
if fi.flags&C.O_SYNC != 0 || fi.flags&C.__O_DIRECT != 0 {
log.Err("Libfuse::libfuse2_open : Reset flags for open %s, fi.flags %X", name, fi.flags)
log.Info("Libfuse::libfuse2_open : Reset flags for open %s, fi.flags %X", name, fi.flags)
// Blobfuse2 does not support the SYNC or DIRECT flag. If a user application passes this flag on to blobfuse2
// and we open the file with this flag, subsequent write operations wlil fail with "Invalid argument" error.
// Mask them out here in the open call so that write works.
Expand Down
2 changes: 1 addition & 1 deletion component/libfuse/libfuse_handler.go
Expand Up @@ -667,7 +667,7 @@ func libfuse_open(path *C.char, fi *C.fuse_file_info_t) C.int {
// TODO: Should this sit behind a user option? What if we change something to support these in the future?
// Mask out SYNC and DIRECT flags since write operation will fail
if fi.flags&C.O_SYNC != 0 || fi.flags&C.__O_DIRECT != 0 {
log.Err("Libfuse::libfuse_open : Reset flags for open %s, fi.flags %X", name, fi.flags)
log.Info("Libfuse::libfuse_open : Reset flags for open %s, fi.flags %X", name, fi.flags)
// Blobfuse2 does not support the SYNC or DIRECT flag. If a user application passes this flag on to blobfuse2
// and we open the file with this flag, subsequent write operations will fail with "Invalid argument" error.
// Mask them out here in the open call so that write works.
Expand Down

0 comments on commit bd7dffc

Please sign in to comment.