Skip to content

Commit

Permalink
Always try to create a bucket when loading a container (flyteorg#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan committed Mar 10, 2021
1 parent 4816e8b commit d411f70
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 27 deletions.
38 changes: 12 additions & 26 deletions storage/stow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,6 @@ var fQNFn = map[string]func(string) DataReference{
},
}

// Checks if the error is AWS S3 bucket not found error
func awsBucketIsNotFound(err error) bool {
if IsNotFound(err) {
return true
}

if awsErr, errOk := errs.Cause(err).(awserr.Error); errOk {
return awsErr.Code() == s32.ErrCodeNoSuchBucket
}

return false
}

// Checks if the error is AWS S3 bucket already exists error.
func awsBucketAlreadyExists(err error) bool {
if IsExists(err) {
Expand Down Expand Up @@ -124,21 +111,20 @@ type StowStore struct {
}

func (s *StowStore) LoadContainer(ctx context.Context, container string, createIfNotFound bool) (stow.Container, error) {
// TODO: As of stow v0.2.6 elides the container lookup when a bucket region is set,
// so we always just attempt to create it when createIfNotFound is true.

if createIfNotFound {
logger.Infof(ctx, "Attempting to create container [%s]", container)
_, err := s.loc.CreateContainer(container)
if err != nil && !awsBucketAlreadyExists(err) && !IsExists(err) {
return nil, fmt.Errorf("unable to initialize container [%v]. Error: %v", container, err)
}
}

c, err := s.loc.Container(container)
if err != nil {
if createIfNotFound {
logger.Infof(ctx, "Container [%s] lookup failed, err [%s], will try to create a new one", container, err)
if IsNotFound(err) || awsBucketIsNotFound(err) {
c, err := s.loc.CreateContainer(container)
// If the container's already created, move on. Otherwise, fail with error.
if err != nil && !awsBucketAlreadyExists(err) && !IsExists(err) {
return nil, fmt.Errorf("unable to initialize container [%v]. Error: %v", container, err)
}
return c, nil
}
} else {
logger.Errorf(ctx, "Container [%s] lookup failed. Error %s", container, err)
}
logger.Errorf(ctx, "Container [%s] lookup failed. Error %s", container, err)
return nil, err
}
return c, nil
Expand Down
91 changes: 90 additions & 1 deletion storage/stow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ import (

type mockStowLoc struct {
stow.Location
ContainerCb func(id string) (stow.Container, error)
ContainerCb func(id string) (stow.Container, error)
CreateContainerCb func(name string) (stow.Container, error)
}

func (m mockStowLoc) Container(id string) (stow.Container, error) {
return m.ContainerCb(id)
}

func (m mockStowLoc) CreateContainer(name string) (stow.Container, error) {
return m.CreateContainerCb(name)
}

type mockStowContainer struct {
id string
items map[string]mockStowItem
Expand Down Expand Up @@ -133,6 +138,12 @@ func TestStowStore_ReadRaw(t *testing.T) {
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
}, false, testScope)
assert.NoError(t, err)
err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 0, Options{}, bytes.NewReader([]byte{}))
Expand All @@ -158,6 +169,12 @@ func TestStowStore_ReadRaw(t *testing.T) {
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
}, false, testScope)
assert.NoError(t, err)
err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 3*MiB, Options{}, bytes.NewReader([]byte{}))
Expand All @@ -183,6 +200,12 @@ func TestStowStore_ReadRaw(t *testing.T) {
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
}, true, testScope)
assert.NoError(t, err)
err = s.WriteRaw(context.TODO(), "s3://bad-container/path", 0, Options{}, bytes.NewReader([]byte{}))
Expand All @@ -209,6 +232,12 @@ func TestStowStore_ReadRaw(t *testing.T) {
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
}, true, testScope)
assert.NoError(t, err)
err = s.WriteRaw(context.TODO(), "s3://bad-container/path", 0, Options{}, bytes.NewReader([]byte{}))
Expand Down Expand Up @@ -373,3 +402,63 @@ func Test_newStowRawStore(t *testing.T) {
})
}
}

func TestLoadContainer(t *testing.T) {
container := "container"
t.Run("Create if not found", func(t *testing.T) {
stowStore := StowStore{
loc: &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
},
}
stowContainer, err := stowStore.LoadContainer(context.Background(), "container", true)
assert.NoError(t, err)
assert.Equal(t, container, stowContainer.ID())
})
t.Run("Create if not found with error", func(t *testing.T) {
stowStore := StowStore{
loc: &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return nil, fmt.Errorf("foo")
}
return nil, fmt.Errorf("container is not supported")
},
},
}
_, err := stowStore.LoadContainer(context.Background(), "container", true)
assert.EqualError(t, err, "unable to initialize container [container]. Error: foo")
})
t.Run("No create if not found", func(t *testing.T) {
stowStore := StowStore{
loc: &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
},
}
stowContainer, err := stowStore.LoadContainer(context.Background(), "container", false)
assert.NoError(t, err)
assert.Equal(t, container, stowContainer.ID())
})
}

0 comments on commit d411f70

Please sign in to comment.