From ca53ff44ab594e7e18b9447532c96711cf7f14b4 Mon Sep 17 00:00:00 2001 From: Swapnil Mhamane Date: Mon, 3 Dec 2018 19:32:17 +0530 Subject: [PATCH] Fix s3 snapstore data race condition Signed-off-by: Swapnil Mhamane --- LICENSE.md | 4 ++-- pkg/snapstore/abs_snapstore.go | 2 +- pkg/snapstore/abs_snapstore_test.go | 12 +++++----- pkg/snapstore/s3_snapstore_test.go | 11 +++++---- pkg/snapstore/types.go | 37 ++++++++++++++++------------- pkg/snapstore/utils.go | 12 ++++++++++ 6 files changed, 47 insertions(+), 31 deletions(-) diff --git a/LICENSE.md b/LICENSE.md index 47edb4ed5..96dcee432 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -258,12 +258,12 @@ MIT License (https://github.com/onsi/gomega/blob/master/LICENSE) Microsoft Azure SDK for Go. https://github.com/Azure/azure-storage-blob-go Copyright 2017 Microsoft -Apache 2 license (https://github.com/Azure/azure-storage-blob-go/blob/master/LICENSE) +MIT License (https://github.com/Azure/azure-storage-blob-go/blob/master/LICENSE) Microsoft Azure SDK for Go. https://github.com/Azure/azure-pipeline-go Copyright 2017 Microsoft -Apache 2 license (https://github.com/Azure/azure-pipeline-go/blob/master/LICENSE) +MIT License (https://github.com/Azure/azure-pipeline-go/blob/master/LICENSE) Google Cloud Client Libraries for Go https://github.com/GoogleCloudPlatform/google-cloud-go diff --git a/pkg/snapstore/abs_snapstore.go b/pkg/snapstore/abs_snapstore.go index 5e0a7e17d..e808055d4 100644 --- a/pkg/snapstore/abs_snapstore.go +++ b/pkg/snapstore/abs_snapstore.go @@ -64,7 +64,7 @@ func NewABSSnapStore(container, prefix, tempDir string, maxParallelChunkUploads Retry: azblob.RetryOptions{ TryTimeout: downloadTimeout, }}) - u, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", storageAccount)) + u, err := url.Parse(fmt.Sprintf("https://%s.%s", storageAccount, AzureBlobStorageHostName)) if err != nil { return nil, fmt.Errorf("failed to parse service url: %v", err) } diff --git a/pkg/snapstore/abs_snapstore_test.go b/pkg/snapstore/abs_snapstore_test.go index 77b473fc8..a869d185c 100644 --- a/pkg/snapstore/abs_snapstore_test.go +++ b/pkg/snapstore/abs_snapstore_test.go @@ -40,7 +40,7 @@ func newFakeABSSnapstore() SnapStore { newFakePolicyFactory(bucket, prefix, objectMap), } p := pipeline.NewPipeline(f, pipeline.Options{HTTPSender: newFakePolicyFactory(bucket, prefix, objectMap)}) - u, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", "dummyaccount")) + u, err := url.Parse(fmt.Sprintf("https://%s.%s", "dummyaccount", AzureBlobStorageHostName)) Expect(err).ShouldNot(HaveOccurred()) serviceURL := azblob.NewServiceURL(*u, p) containerURL := serviceURL.NewContainerURL(bucket) @@ -49,6 +49,9 @@ func newFakeABSSnapstore() SnapStore { return a } +// Please follow the link https://github.com/Azure/azure-pipeline-go/blob/master/pipeline/policies_test.go +// for details about details of azure policy, policy factory and pipeline + // newFakePolicyFactory creates a 'Fake' policy factory. func newFakePolicyFactory(bucket, prefix string, objectMap map[string]*[]byte) pipeline.Factory { return &fakePolicyFactory{bucket, prefix, objectMap} @@ -82,12 +85,9 @@ type fakePolicy struct { multiPartUploadsMutex sync.Mutex } +// Do method is called on pipeline to process the request. This will internally call the `Do` method +// on next policies in pieline and return the responce from it. func (p *fakePolicy) Do(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) { - // Your code should NOT mutate the ctx or request parameters - // However, you can make a copy of the request and mutate the copy - // You can also pass a different Context on. - // You can optionally use po (PolicyOptions) in this func. - httpReq, err := http.NewRequest(request.Method, request.URL.String(), request.Body) if err != nil { return nil, err diff --git a/pkg/snapstore/s3_snapstore_test.go b/pkg/snapstore/s3_snapstore_test.go index 505bf0b8b..14b0c3e17 100644 --- a/pkg/snapstore/s3_snapstore_test.go +++ b/pkg/snapstore/s3_snapstore_test.go @@ -73,20 +73,21 @@ func (m *mockS3Client) CreateMultipartUploadWithContext(ctx aws.Context, in *s3. } func (m *mockS3Client) UploadPartWithContext(ctx aws.Context, in *s3.UploadPartInput, opts ...request.Option) (*s3.UploadPartOutput, error) { - if m.multiPartUploads[*in.UploadId] == nil { - return nil, fmt.Errorf("multipart upload not initiated") - } if *in.PartNumber < 0 { return nil, fmt.Errorf("part number should be positive integer") } + m.multiPartUploadsMutex.Lock() + if m.multiPartUploads[*in.UploadId] == nil { + m.multiPartUploadsMutex.Unlock() + return nil, fmt.Errorf("multipart upload not initiated") + } if *in.PartNumber > int64(len(*m.multiPartUploads[*in.UploadId])) { - m.multiPartUploadsMutex.Lock() t := make([][]byte, *in.PartNumber) copy(t, *m.multiPartUploads[*in.UploadId]) delete(m.multiPartUploads, *in.UploadId) m.multiPartUploads[*in.UploadId] = &t - m.multiPartUploadsMutex.Unlock() } + m.multiPartUploadsMutex.Unlock() temp, err := ioutil.ReadAll(in.Body) if err != nil { return nil, fmt.Errorf("failed to read complete body %v", err) diff --git a/pkg/snapstore/types.go b/pkg/snapstore/types.go index 8c5497340..f83aad7c8 100644 --- a/pkg/snapstore/types.go +++ b/pkg/snapstore/types.go @@ -24,49 +24,52 @@ import ( // Only purpose of these implementation to provide CPI layer to // access files. type SnapStore interface { - // Fetch should open reader for the snapshot file from store + // Fetch should open reader for the snapshot file from store. Fetch(Snapshot) (io.ReadCloser, error) - // List will list all snapshot files on store + // List will list all snapshot files on store. List() (SnapList, error) - // Save will write the snapshot to store + // Save will write the snapshot to store. Save(Snapshot, io.Reader) error - // Delete should delete the snapshot file from store + // Delete should delete the snapshot file from store. Delete(Snapshot) error } const ( - // minChunkSize is set to 5Mib since AWS doesn't allow chunk size less than that + // minChunkSize is set to 5Mib since it is lower chunk size limit for AWS. minChunkSize int64 = 5 * (1 << 20) //5 MiB - // SnapstoreProviderLocal is constant for local disk storage provider + // SnapstoreProviderLocal is constant for local disk storage provider. SnapstoreProviderLocal = "Local" - // SnapstoreProviderS3 is constant for aws S3 storage provider + // SnapstoreProviderS3 is constant for aws S3 storage provider. SnapstoreProviderS3 = "S3" - // SnapstoreProviderABS is constant for azure blob storage provider + // SnapstoreProviderABS is constant for azure blob storage provider. SnapstoreProviderABS = "ABS" - // SnapstoreProviderGCS is constant for GCS object storage provider + // SnapstoreProviderGCS is constant for GCS object storage provider. SnapstoreProviderGCS = "GCS" - // SnapstoreProviderSwift is constant for Swift object storage + // SnapstoreProviderSwift is constant for Swift object storage. SnapstoreProviderSwift = "Swift" - // SnapshotKindFull is constant for full snapshot kind + // SnapshotKindFull is constant for full snapshot kind. SnapshotKindFull = "Full" - // SnapshotKindDelta is constant for delta snapshot kind + // SnapshotKindDelta is constant for delta snapshot kind. SnapshotKindDelta = "Incr" - // chunkUploadTimeout is timeout for uploading chunk + // chunkUploadTimeout is timeout for uploading chunk. chunkUploadTimeout = 180 * time.Second - // providerConnectionTimeout is timeout for connection/short queries to cloud provider + // providerConnectionTimeout is timeout for connection/short queries to cloud provider. providerConnectionTimeout = 30 * time.Second - // downloadTimeout is timeout for downloading chunk + // downloadTimeout is timeout for downloading chunk. downloadTimeout = 5 * time.Minute tmpBackupFilePrefix = "etcd-backup-" // maxRetryAttempts indicates the number of attempts to be retried in case of failure to upload chunk. maxRetryAttempts = 5 + + // AzureBlobStorageHostName is the host name for azure blob storage service. + AzureBlobStorageHostName = "blob.core.windows.net" ) -// Snapshot structure represents the metadata of snapshot +// Snapshot structure represents the metadata of snapshot.s type Snapshot struct { Kind string //incr:incremental,full:full StartRevision int64 @@ -77,7 +80,7 @@ type Snapshot struct { IsChunk bool } -// SnapList is list of snapshots +// SnapList is list of snapshots. type SnapList []*Snapshot // Config defines the configuration to create snapshot store. diff --git a/pkg/snapstore/utils.go b/pkg/snapstore/utils.go index 092eae14d..e331de249 100644 --- a/pkg/snapstore/utils.go +++ b/pkg/snapstore/utils.go @@ -33,6 +33,18 @@ func GetSnapstore(config *Config) (SnapStore, error) { if config.Container == "" { config.Container = os.Getenv(envStorageContainer) } + + if _, err := os.Stat(config.TempDir); err != nil { + if os.IsNotExist(err) { + logrus.Infof("Temporary directory %s does not exit. Creating it...", config.TempDir) + if err := os.MkdirAll(config.TempDir, 0700); err != nil { + return nil, fmt.Errorf("failed to create temporary directory %s: %v", config.TempDir, err) + } + } else { + return nil, fmt.Errorf("failed to get file info of temporary directory %s: %v", config.TempDir, err) + } + } + if config.MaxParallelChunkUploads <= 0 { config.MaxParallelChunkUploads = 5 }