Skip to content

Commit

Permalink
Fix s3 snapstore data race condition
Browse files Browse the repository at this point in the history
Signed-off-by: Swapnil Mhamane <swapnil.mhamane@sap.com>
  • Loading branch information
Swapnil Mhamane committed Dec 4, 2018
1 parent 78a6753 commit ca53ff4
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 31 deletions.
4 changes: 2 additions & 2 deletions LICENSE.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ MIT License (https://github.com/onsi/gomega/blob/master/LICENSE)
Microsoft Azure SDK for Go.
https://github.com/Azure/azure-storage-blob-go
Copyright 2017 Microsoft
Apache 2 license (https://github.com/Azure/azure-storage-blob-go/blob/master/LICENSE)
MIT License (https://github.com/Azure/azure-storage-blob-go/blob/master/LICENSE)

Microsoft Azure SDK for Go.
https://github.com/Azure/azure-pipeline-go
Copyright 2017 Microsoft
Apache 2 license (https://github.com/Azure/azure-pipeline-go/blob/master/LICENSE)
MIT License (https://github.com/Azure/azure-pipeline-go/blob/master/LICENSE)

Google Cloud Client Libraries for Go
https://github.com/GoogleCloudPlatform/google-cloud-go
Expand Down
2 changes: 1 addition & 1 deletion pkg/snapstore/abs_snapstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewABSSnapStore(container, prefix, tempDir string, maxParallelChunkUploads
Retry: azblob.RetryOptions{
TryTimeout: downloadTimeout,
}})
u, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", storageAccount))
u, err := url.Parse(fmt.Sprintf("https://%s.%s", storageAccount, AzureBlobStorageHostName))
if err != nil {
return nil, fmt.Errorf("failed to parse service url: %v", err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/snapstore/abs_snapstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newFakeABSSnapstore() SnapStore {
newFakePolicyFactory(bucket, prefix, objectMap),
}
p := pipeline.NewPipeline(f, pipeline.Options{HTTPSender: newFakePolicyFactory(bucket, prefix, objectMap)})
u, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", "dummyaccount"))
u, err := url.Parse(fmt.Sprintf("https://%s.%s", "dummyaccount", AzureBlobStorageHostName))
Expect(err).ShouldNot(HaveOccurred())
serviceURL := azblob.NewServiceURL(*u, p)
containerURL := serviceURL.NewContainerURL(bucket)
Expand All @@ -49,6 +49,9 @@ func newFakeABSSnapstore() SnapStore {
return a
}

// Please follow the link https://github.com/Azure/azure-pipeline-go/blob/master/pipeline/policies_test.go
// for details about details of azure policy, policy factory and pipeline

// newFakePolicyFactory creates a 'Fake' policy factory.
func newFakePolicyFactory(bucket, prefix string, objectMap map[string]*[]byte) pipeline.Factory {
return &fakePolicyFactory{bucket, prefix, objectMap}
Expand Down Expand Up @@ -82,12 +85,9 @@ type fakePolicy struct {
multiPartUploadsMutex sync.Mutex
}

// Do method is called on pipeline to process the request. This will internally call the `Do` method
// on next policies in pieline and return the responce from it.
func (p *fakePolicy) Do(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) {
// Your code should NOT mutate the ctx or request parameters
// However, you can make a copy of the request and mutate the copy
// You can also pass a different Context on.
// You can optionally use po (PolicyOptions) in this func.

httpReq, err := http.NewRequest(request.Method, request.URL.String(), request.Body)
if err != nil {
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions pkg/snapstore/s3_snapstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,21 @@ func (m *mockS3Client) CreateMultipartUploadWithContext(ctx aws.Context, in *s3.
}

func (m *mockS3Client) UploadPartWithContext(ctx aws.Context, in *s3.UploadPartInput, opts ...request.Option) (*s3.UploadPartOutput, error) {
if m.multiPartUploads[*in.UploadId] == nil {
return nil, fmt.Errorf("multipart upload not initiated")
}
if *in.PartNumber < 0 {
return nil, fmt.Errorf("part number should be positive integer")
}
m.multiPartUploadsMutex.Lock()
if m.multiPartUploads[*in.UploadId] == nil {
m.multiPartUploadsMutex.Unlock()
return nil, fmt.Errorf("multipart upload not initiated")
}
if *in.PartNumber > int64(len(*m.multiPartUploads[*in.UploadId])) {
m.multiPartUploadsMutex.Lock()
t := make([][]byte, *in.PartNumber)
copy(t, *m.multiPartUploads[*in.UploadId])
delete(m.multiPartUploads, *in.UploadId)
m.multiPartUploads[*in.UploadId] = &t
m.multiPartUploadsMutex.Unlock()
}
m.multiPartUploadsMutex.Unlock()
temp, err := ioutil.ReadAll(in.Body)
if err != nil {
return nil, fmt.Errorf("failed to read complete body %v", err)
Expand Down
37 changes: 20 additions & 17 deletions pkg/snapstore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,52 @@ import (
// Only purpose of these implementation to provide CPI layer to
// access files.
type SnapStore interface {
// Fetch should open reader for the snapshot file from store
// Fetch should open reader for the snapshot file from store.
Fetch(Snapshot) (io.ReadCloser, error)
// List will list all snapshot files on store
// List will list all snapshot files on store.
List() (SnapList, error)
// Save will write the snapshot to store
// Save will write the snapshot to store.
Save(Snapshot, io.Reader) error
// Delete should delete the snapshot file from store
// Delete should delete the snapshot file from store.
Delete(Snapshot) error
}

const (
// minChunkSize is set to 5Mib since AWS doesn't allow chunk size less than that
// minChunkSize is set to 5Mib since it is lower chunk size limit for AWS.
minChunkSize int64 = 5 * (1 << 20) //5 MiB
// SnapstoreProviderLocal is constant for local disk storage provider
// SnapstoreProviderLocal is constant for local disk storage provider.
SnapstoreProviderLocal = "Local"
// SnapstoreProviderS3 is constant for aws S3 storage provider
// SnapstoreProviderS3 is constant for aws S3 storage provider.
SnapstoreProviderS3 = "S3"
// SnapstoreProviderABS is constant for azure blob storage provider
// SnapstoreProviderABS is constant for azure blob storage provider.
SnapstoreProviderABS = "ABS"
// SnapstoreProviderGCS is constant for GCS object storage provider
// SnapstoreProviderGCS is constant for GCS object storage provider.
SnapstoreProviderGCS = "GCS"
// SnapstoreProviderSwift is constant for Swift object storage
// SnapstoreProviderSwift is constant for Swift object storage.
SnapstoreProviderSwift = "Swift"

// SnapshotKindFull is constant for full snapshot kind
// SnapshotKindFull is constant for full snapshot kind.
SnapshotKindFull = "Full"
// SnapshotKindDelta is constant for delta snapshot kind
// SnapshotKindDelta is constant for delta snapshot kind.
SnapshotKindDelta = "Incr"

// chunkUploadTimeout is timeout for uploading chunk
// chunkUploadTimeout is timeout for uploading chunk.
chunkUploadTimeout = 180 * time.Second
// providerConnectionTimeout is timeout for connection/short queries to cloud provider
// providerConnectionTimeout is timeout for connection/short queries to cloud provider.
providerConnectionTimeout = 30 * time.Second
// downloadTimeout is timeout for downloading chunk
// downloadTimeout is timeout for downloading chunk.
downloadTimeout = 5 * time.Minute

tmpBackupFilePrefix = "etcd-backup-"

// maxRetryAttempts indicates the number of attempts to be retried in case of failure to upload chunk.
maxRetryAttempts = 5

// AzureBlobStorageHostName is the host name for azure blob storage service.
AzureBlobStorageHostName = "blob.core.windows.net"
)

// Snapshot structure represents the metadata of snapshot
// Snapshot structure represents the metadata of snapshot.s
type Snapshot struct {
Kind string //incr:incremental,full:full
StartRevision int64
Expand All @@ -77,7 +80,7 @@ type Snapshot struct {
IsChunk bool
}

// SnapList is list of snapshots
// SnapList is list of snapshots.
type SnapList []*Snapshot

// Config defines the configuration to create snapshot store.
Expand Down
12 changes: 12 additions & 0 deletions pkg/snapstore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ func GetSnapstore(config *Config) (SnapStore, error) {
if config.Container == "" {
config.Container = os.Getenv(envStorageContainer)
}

if _, err := os.Stat(config.TempDir); err != nil {
if os.IsNotExist(err) {
logrus.Infof("Temporary directory %s does not exit. Creating it...", config.TempDir)
if err := os.MkdirAll(config.TempDir, 0700); err != nil {
return nil, fmt.Errorf("failed to create temporary directory %s: %v", config.TempDir, err)
}
} else {
return nil, fmt.Errorf("failed to get file info of temporary directory %s: %v", config.TempDir, err)
}
}

if config.MaxParallelChunkUploads <= 0 {
config.MaxParallelChunkUploads = 5
}
Expand Down

0 comments on commit ca53ff4

Please sign in to comment.