Skip to content

Commit

Permalink
Update S3 collector to support collecting from a directory within the…
Browse files Browse the repository at this point in the history
… bucket (#1871)

* Use prefix parameter to narrow list of s3 bucket files

Signed-off-by: Eszter Szucs-Matyas <eszter.szucs.matyas@gmail.com>

* Do not try handle s3 folders as files

Signed-off-by: Eszter Szucs-Matyas <eszter.szucs.matyas@gmail.com>

* Clean up variable naming and docs

Signed-off-by: Eszter Szucs-Matyas <eszter.szucs.matyas@gmail.com>

* add s3 path collect example command

Signed-off-by: Eszter Szucs-Matyas <eszter.szucs.matyas@gmail.com>

---------

Signed-off-by: Eszter Szucs-Matyas <eszter.szucs.matyas@gmail.com>
  • Loading branch information
huggingpixels committed May 7, 2024
1 parent 286c0f8 commit 73108d1
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 11 deletions.
21 changes: 17 additions & 4 deletions cmd/guaccollect/cmd/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type s3Options struct {
blobAddr string // address for the blob store to connect to
s3url string // base url of the s3 to collect from
s3bucket string // name of bucket to collect from
s3path string // path to s3 folder with documents to collect
s3item string // s3 item (only for non-polling behaviour)
region string // AWS region, for s3/sqs configuration (defaults to us-east-1)
queues string // comma-separated list of queues/topics (only for polling behaviour)
Expand All @@ -34,7 +35,9 @@ type s3Options struct {
var s3Cmd = &cobra.Command{
Use: "s3 [flags]",
Short: "takes SBOMs and attestations from S3 compatible bucket and injects them to GUAC graph",
Long: `S3 collector can download one item from the storage, the whole bucket or listen to storage events using sqs/kafka (poll) and download the files as they are uploaded.
Long: `
guaccollect S3 collector can download one item from the storage, all items from a folder, a whole bucket
or listen to storage events using sqs/kafka (poll) and download the files as they are uploaded.
Make sure that access credentials variables are properly set.`,
Example: `Create example bucket:
Expand All @@ -48,8 +51,14 @@ $ export AWS_SECRET_ACCESS_KEY=zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
Ingest:
$ guacone collect s3 --s3-url https://play.min.io --s3-bucket guac-test
$ guacone collect s3 --s3-url play.min.io --s3-bucket guac-test --s3-item alpine-cyclonedx.json
$ guaccollect s3 --s3-url https://play.min.io --s3-bucket guac-test
$ guaccollect s3 --s3-url play.min.io --s3-bucket guac-test --s3-item alpine-cyclonedx.json
Ingest from AWS using default url:
$ guaccollect s3 --s3-bucket guac-test --s3-region eu-north-1
$ guaccollect s3 --s3-bucket guac-test --s3-region eu-north-1 --s3-path sboms/
For the polling option, you need to define event bus endpoint for bucket notifications:
Expand All @@ -66,6 +75,7 @@ $ guacone collect s3 --s3-url http://localhost:9000 --s3-bucket guac-test --poll
viper.GetString("csub-addr"),
viper.GetString("s3-url"),
viper.GetString("s3-bucket"),
viper.GetString("s3-path"),
viper.GetString("s3-region"),
viper.GetString("s3-item"),
viper.GetString("s3-mp"),
Expand All @@ -88,6 +98,7 @@ $ guacone collect s3 --s3-url http://localhost:9000 --s3-bucket guac-test --poll
S3Url: s3Opts.s3url,
S3Bucket: s3Opts.s3bucket,
S3Region: s3Opts.region,
S3Path: s3Opts.s3path,
S3Item: s3Opts.s3item,
MessageProvider: s3Opts.mp,
MessageProviderEndpoint: s3Opts.mpEndpoint,
Expand Down Expand Up @@ -117,6 +128,7 @@ func validateS3Opts(
csubAddr,
s3url,
s3bucket,
s3path,
region,
s3item,
mp,
Expand Down Expand Up @@ -153,6 +165,7 @@ func validateS3Opts(
blobAddr: blobAddr,
s3url: s3url,
s3bucket: s3bucket,
s3path: s3path,
s3item: s3item,
region: region,
queues: queues,
Expand All @@ -166,7 +179,7 @@ func validateS3Opts(
}

func init() {
set, err := cli.BuildFlags([]string{"s3-url", "s3-bucket", "s3-item", "s3-region", "s3-queues", "s3-mp", "s3-mp-endpoint"})
set, err := cli.BuildFlags([]string{"s3-url", "s3-bucket", "s3-path", "s3-item", "s3-region", "s3-queues", "s3-mp", "s3-mp-endpoint"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err)
os.Exit(1)
Expand Down
9 changes: 6 additions & 3 deletions cmd/guacone/cmd/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
type s3Options struct {
s3url string // base url of the s3 to collect from
s3bucket string // name of bucket to collect from
s3path string // path to s3 folder with documents to collect
s3item string // s3 item (only for non-polling behaviour)
region string // AWS region, for s3/sqs configuration (defaults to us-east-1)
queues string // comma-separated list of queues/topics (only for polling behaviour)
Expand Down Expand Up @@ -82,6 +83,7 @@ $ guacone collect s3 --s3-url http://localhost:9000 --s3-bucket guac-test --poll
viper.GetString("csub-addr"),
viper.GetString("s3-url"),
viper.GetString("s3-bucket"),
viper.GetString("s3-path"),
viper.GetString("s3-region"),
viper.GetString("s3-item"),
viper.GetString("s3-mp"),
Expand All @@ -108,6 +110,7 @@ $ guacone collect s3 --s3-url http://localhost:9000 --s3-bucket guac-test --poll
S3Url: s3Opts.s3url,
S3Bucket: s3Opts.s3bucket,
S3Region: s3Opts.region,
S3Path: s3Opts.s3path,
S3Item: s3Opts.s3item,
MessageProvider: s3Opts.mp,
MessageProviderEndpoint: s3Opts.mpEndpoint,
Expand Down Expand Up @@ -176,7 +179,7 @@ $ guacone collect s3 --s3-url http://localhost:9000 --s3-bucket guac-test --poll
},
}

func validateS3Opts(graphqlEndpoint, headerFile, csubAddr, s3url, s3bucket, region, s3item, mp, mpEndpoint, queues string, csubTls, csubTlsSkipVerify, poll bool) (s3Options, error) {
func validateS3Opts(graphqlEndpoint, headerFile, csubAddr, s3url, s3bucket, s3path, region, s3item, mp, mpEndpoint, queues string, csubTls, csubTlsSkipVerify, poll bool) (s3Options, error) {
var opts s3Options

if poll {
Expand All @@ -199,13 +202,13 @@ func validateS3Opts(graphqlEndpoint, headerFile, csubAddr, s3url, s3bucket, regi
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}

opts = s3Options{s3url, s3bucket, s3item, region, queues, mp, mpEndpoint, poll, graphqlEndpoint, headerFile, csubClientOptions}
opts = s3Options{s3url, s3bucket, s3path, s3item, region, queues, mp, mpEndpoint, poll, graphqlEndpoint, headerFile, csubClientOptions}

return opts, nil
}

func init() {
set, err := cli.BuildFlags([]string{"s3-url", "s3-bucket", "s3-region", "s3-item", "s3-mp", "s3-mp-endpoint", "s3-queues", "poll"})
set, err := cli.BuildFlags([]string{"s3-url", "s3-bucket", "s3-region", "s3-path", "s3-item", "s3-mp", "s3-mp-endpoint", "s3-queues", "poll"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %s", err)
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func init() {

// S3 flags
set.String("s3-url", "", "url of the s3 endpoint")
set.String("s3-path", "", "path to folder containing documents in the s3 bucket")
set.String("s3-bucket", "", "bucket in the s3 provider")
set.String("s3-item", "", "item in the s3 provider")
set.String("s3-mp", "kafka", "message provider (sqs or kafka)")
Expand Down
10 changes: 8 additions & 2 deletions pkg/handler/collector/s3/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -42,7 +43,7 @@ func (bd *BucketBuilder) GetBucket(url string, region string) Bucket {
}

type Bucket interface {
ListFiles(ctx context.Context, bucket string, token *string, max int32) ([]string, *string, error)
ListFiles(ctx context.Context, bucket string, prefix string, token *string, max int32) ([]string, *string, error)
DownloadFile(ctx context.Context, bucket string, item string) ([]byte, error)
GetEncoding(ctx context.Context, bucket string, item string) (string, error)
}
Expand All @@ -56,7 +57,7 @@ func GetDefaultBucket(url string, region string) Bucket {
return &s3Bucket{url, region}
}

func (d *s3Bucket) ListFiles(ctx context.Context, bucket string, token *string, max int32) ([]string, *string, error) {
func (d *s3Bucket) ListFiles(ctx context.Context, bucket string, prefix string, token *string, max int32) ([]string, *string, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, nil, fmt.Errorf("error loading AWS SDK config: %w", err)
Expand All @@ -75,6 +76,7 @@ func (d *s3Bucket) ListFiles(ctx context.Context, bucket string, token *string,

input := &s3.ListObjectsV2Input{
Bucket: &bucket,
Prefix: &prefix,
ContinuationToken: token,
MaxKeys: aws.Int32(max),
}
Expand All @@ -85,6 +87,10 @@ func (d *s3Bucket) ListFiles(ctx context.Context, bucket string, token *string,

var files []string
for _, item := range resp.Contents {
// ignore s3 objects that are directories
if strings.HasSuffix(*item.Key, "/") {
continue
}
files = append(files, *item.Key)
}
return files, resp.NextContinuationToken, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/collector/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type S3CollectorConfig struct {
MessageProviderEndpoint string // optional if using the sqs message provider
S3Url string // optional (uses aws sdk defaults)
S3Bucket string // bucket name to collect from
S3Path string // optional (only for non-polling) s3 folder path to collect from
S3Item string // optional (only for non-polling behaviour)
S3Region string // optional (defaults to us-east-1, assumes same region for s3 and sqs)
Queues string // optional (comma-separated list of queues/topics)
Expand Down Expand Up @@ -100,7 +101,7 @@ func retrieve(s S3Collector, ctx context.Context, docChannel chan<- *processor.D
var token *string
const MaxKeys = 100
for {
files, t, err := downloader.ListFiles(ctx, s.config.S3Bucket, token, MaxKeys)
files, t, err := downloader.ListFiles(ctx, s.config.S3Bucket, s.config.S3Path, token, MaxKeys)
if err != nil {
logger.Errorf("could not list files %v: %v", item, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/collector/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (tb *TestMpBuilder) GetMessageProvider(config messaging.MessageProviderConf
type TestBucket struct {
}

func (td *TestBucket) ListFiles(ctx context.Context, bucket string, token *string, max int32) ([]string, *string, error) {
func (td *TestBucket) ListFiles(ctx context.Context, bucket string, prefix string, token *string, max int32) ([]string, *string, error) {
return []string{"no-poll-item"}, nil, nil
}

Expand Down

0 comments on commit 73108d1

Please sign in to comment.