Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/bucket/azure/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,11 @@ func (c *BlobClient) FGetObject(ctx context.Context, bucketName, objectName, loc
// If the underlying client or the visit callback returns an error,
// it returns early.
func (c *BlobClient) VisitObjects(ctx context.Context, bucketName string, prefix string, visit func(path, etag string) error) error {
items := c.NewListBlobsFlatPager(bucketName, nil)
opts := &azblob.ListBlobsFlatOptions{}
if prefix != "" {
opts.Prefix = &prefix
}
items := c.NewListBlobsFlatPager(bucketName, opts)
for items.More() {
resp, err := items.NextPage(ctx)
if err != nil {
Expand Down
81 changes: 81 additions & 0 deletions internal/bucket/azure/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,87 @@ func Test_sasTokenFromSecret(t *testing.T) {
}
}

func TestBlobClient_VisitObjects_Prefix(t *testing.T) {
bucketName := "test-bucket"

tests := []struct {
name string
prefix string
}{
{
name: "with prefix",
prefix: "subfolder/",
},
{
name: "without prefix",
prefix: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

// start mock bucket server
bucketListener, bucketAddr, _ := testlistener.New(t)
bucketEndpoint := fmt.Sprintf("http://%s", bucketAddr)
bucketHandler := http.NewServeMux()
bucketHandler.HandleFunc(fmt.Sprintf("GET /%s", bucketName), func(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
g.Expect(q.Get("comp")).To(Equal("list"))
g.Expect(q.Get("restype")).To(Equal("container"))

// Assert the prefix query parameter.
if tt.prefix != "" {
g.Expect(q.Get("prefix")).To(Equal(tt.prefix))
} else {
g.Expect(q.Has("prefix")).To(BeFalse(), "prefix query parameter should not be set when prefix is empty")
}

resp := fmt.Sprintf(`<?xml version="1.0" encoding="utf-8"?>
<EnumerationResults ContainerName="%s/%s">
<Blobs>
<Blob>
<Name>%sfile.txt</Name>
<Properties>
<Etag>0x8D9B2A2A2A2A2A2</Etag>
</Properties>
</Blob>
</Blobs>
<NextMarker />
</EnumerationResults>`, bucketEndpoint, bucketName, tt.prefix)
_, err := w.Write([]byte(resp))
g.Expect(err).ToNot(HaveOccurred())
})
bucketServer := &http.Server{
Addr: bucketAddr,
Handler: bucketHandler,
}
go bucketServer.Serve(bucketListener)
defer bucketServer.Shutdown(context.Background())

bucket := &sourcev1.Bucket{
Spec: sourcev1.BucketSpec{
Endpoint: bucketEndpoint,
},
}
client, err := NewClient(t.Context(),
bucket,
withoutCredentials(),
withoutRetries())
g.Expect(err).ToNot(HaveOccurred())

var visited []string
err = client.VisitObjects(t.Context(), bucketName, tt.prefix, func(path, etag string) error {
visited = append(visited, path)
return nil
})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(visited).To(Equal([]string{tt.prefix + "file.txt"}))
})
}
}

func Test_chainCredentialWithSecret(t *testing.T) {
g := NewWithT(t)

Expand Down
Loading