diff --git a/cmd/guacone/cmd/gcs.go b/cmd/guacone/cmd/gcs.go new file mode 100644 index 0000000000..c01cdca612 --- /dev/null +++ b/cmd/guacone/cmd/gcs.go @@ -0,0 +1,190 @@ +// +// Copyright 2022 The GUAC Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "fmt" + "os" + "time" + + "cloud.google.com/go/storage" + "github.com/guacsec/guac/pkg/cli" + csub_client "github.com/guacsec/guac/pkg/collectsub/client" + "github.com/guacsec/guac/pkg/handler/collector" + "github.com/guacsec/guac/pkg/handler/collector/gcs" + "github.com/guacsec/guac/pkg/handler/processor" + "github.com/guacsec/guac/pkg/logging" + "github.com/guacsec/guac/pkg/version" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "google.golang.org/api/option" +) + +type gcsOptions struct { + graphqlEndpoint string + csubAddr string + bucket string +} + +const gcsCredentialsPathFlag = "gcp-credentials-path" + +var gcsCmd = &cobra.Command{ + Use: "gcs [flags] bucket_name", + Short: "takes SBOMs and attestations from a Google Cloud Storage bucket and injects them to GUAC graph. This command talks directly to the graphQL endpoint", + Example: "guacone collect gcs my-bucket --gcs-credentials-path /secret/sa.json", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + ctx := logging.WithLogger(context.Background()) + logger := logging.FromContext(ctx) + + opts, err := validateGCSFlags(viper.GetString("gql-addr"), viper.GetString("csub-addr"), viper.GetString(gcsCredentialsPathFlag), args) + if err != nil { + fmt.Printf("unable to validate flags: %v\n", err) + _ = cmd.Help() + os.Exit(1) + } + + gcsOpts := []option.ClientOption{ + option.WithUserAgent(version.UserAgent), + } + + // Credential flag is not mandatory since they can also be loaded from + // the environment variable GOOGLE_APPLICATION_CREDENTIALS by the client, by default + if credsPath := viper.GetString(gcsCredentialsPathFlag); credsPath != "" { + gcsOpts = append(gcsOpts, option.WithCredentialsFile(credsPath)) + } + + client, err := storage.NewClient(ctx, gcsOpts...) + if err != nil { + logger.Fatalf("creating client: %v", err) + } + + // Register collector by providing a new GCS Client and bucket name + gcsCollector, err := gcs.NewGCSCollector(gcs.WithBucket(opts.bucket), gcs.WithClient(client)) + if err != nil { + logger.Fatalf("unable to create gcs client: %v", err) + } + + err = collector.RegisterDocumentCollector(gcsCollector, gcs.CollectorGCS) + if err != nil { + logger.Fatalf("unable to register gcs collector: %v", err) + } + + // initialize collectsub client + csubClient, err := csub_client.NewClient(opts.csubAddr) + if err != nil { + logger.Infof("collectsub client initialization failed, this ingestion will not pull in any additional data through the collectsub service: %v", err) + csubClient = nil + } else { + defer csubClient.Close() + } + + // TODO: this code could be extracted to a function and reused by the + // other collectors (files and oci) + // Get pipeline of components + processorFunc := getProcessor(ctx) + ingestorFunc := getIngestor(ctx) + collectSubEmitFunc := getCollectSubEmit(ctx, csubClient) + assemblerFunc := getAssembler(ctx, opts.graphqlEndpoint) + + totalNum := 0 + gotErr := false + // Set emit function to go through the entire pipeline + emit := func(d *processor.Document) error { + totalNum += 1 + start := time.Now() + + docTree, err := processorFunc(d) + if err != nil { + gotErr = true + return fmt.Errorf("unable to process doc: %v, format: %v, document: %v", err, d.Format, d.Type) + } + + predicates, idstrings, err := ingestorFunc(docTree) + if err != nil { + gotErr = true + return fmt.Errorf("unable to ingest doc tree: %v", err) + } + + err = collectSubEmitFunc(idstrings) + if err != nil { + logger.Infof("unable to create entries in collectsub server, but continuing: %v", err) + } + + err = assemblerFunc(predicates) + if err != nil { + gotErr = true + return fmt.Errorf("unable to assemble graphs: %v", err) + } + t := time.Now() + elapsed := t.Sub(start) + logger.Infof("[%v] completed doc %+v", elapsed, d.SourceInformation) + return nil + } + + // Collect + errHandler := func(err error) bool { + if err == nil { + logger.Info("collector ended gracefully") + return true + } + logger.Errorf("collector ended with error: %v", err) + return false + } + if err := collector.Collect(ctx, emit, errHandler); err != nil { + logger.Fatal(err) + } + + if gotErr { + logger.Fatalf("completed ingestion with errors") + } else { + logger.Infof("completed ingesting %v documents", totalNum) + } + }, +} + +func validateGCSFlags(gqlEndpoint, csubAddr, credentialsPath string, args []string) (gcsOptions, error) { + var opts gcsOptions + opts.graphqlEndpoint = gqlEndpoint + opts.csubAddr = csubAddr + + if len(args) < 1 { + return opts, fmt.Errorf("expected positional argument: bucket") + } + opts.bucket = args[0] + + if credentialsPath == "" && os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" { + return opts, fmt.Errorf("expected either --%s flag or GOOGLE_APPLICATION_CREDENTIALS environment variable", gcsCredentialsPathFlag) + } + + return opts, nil +} + +func init() { + set, err := cli.BuildFlags([]string{gcsCredentialsPathFlag}) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err) + os.Exit(1) + } + gcsCmd.Flags().AddFlagSet(set) + if err := viper.BindPFlags(gcsCmd.Flags()); err != nil { + fmt.Fprintf(os.Stderr, "failed to bind flags: %v", err) + os.Exit(1) + } + + collectCmd.AddCommand(gcsCmd) +} diff --git a/cmd/guacone/cmd/gcs_test.go b/cmd/guacone/cmd/gcs_test.go new file mode 100644 index 0000000000..b0bec89812 --- /dev/null +++ b/cmd/guacone/cmd/gcs_test.go @@ -0,0 +1,80 @@ +// +// Copyright 2022 The GUAC Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import "testing" + +func TestValidateGCSFlags(t *testing.T) { + testCases := []struct { + name string + args []string + credentialsPath string + credsEnvVarSet bool + errorMsg string + }{ + { + name: "no args", + errorMsg: "expected positional argument: bucket", + }, + { + name: "no credentials", + args: []string{"bucket"}, + errorMsg: "expected either --gcp-credentials-path flag or GOOGLE_APPLICATION_CREDENTIALS environment variable", + }, + { + name: "credentials path and env var set", + args: []string{"bucket"}, + credentialsPath: "/path/to/creds.json", + credsEnvVarSet: true, + errorMsg: "", + }, + { + name: "credentials path and env var not set", + args: []string{"bucket"}, + credentialsPath: "/path/to/creds.json", + }, + { + name: "credentials path not set and env var set", + args: []string{"bucket"}, + credsEnvVarSet: true, + errorMsg: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.credsEnvVarSet { + t.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "/path/to/creds.json") + } + + o, err := validateGCSFlags("", "", tc.credentialsPath, tc.args) + if err != nil { + if tc.errorMsg != err.Error() { + t.Errorf("expected error message: %s, got: %s", tc.errorMsg, err.Error()) + } + } else { + if tc.errorMsg != "" { + t.Errorf("expected error message: %s, got: %s", tc.errorMsg, err.Error()) + } + + if o.bucket != tc.args[0] { + t.Errorf("expected bucket: %s, got: %s", tc.args[0], o.bucket) + } + } + }) + } + +} diff --git a/pkg/cli/store.go b/pkg/cli/store.go index c5d37ec3fd..730a690aac 100644 --- a/pkg/cli/store.go +++ b/pkg/cli/store.go @@ -65,6 +65,9 @@ func init() { set.StringP("vuln-id", "v", "", "CVE, GHSA or OSV ID to check") set.Int("num-path", 0, "number of paths to return, 0 means all paths") + // Google Cloud platform flags + set.String("gcp-credentials-path", "", "Path to the Google Cloud service account credentials json file.\nAlternatively you can set GOOGLE_APPLICATION_CREDENTIALS= in your environment.") + set.VisitAll(func(f *pflag.Flag) { flagStore[f.Name] = f }) diff --git a/pkg/handler/collector/gcs/gcs.go b/pkg/handler/collector/gcs/gcs.go index 1ee238769c..48c848969f 100644 --- a/pkg/handler/collector/gcs/gcs.go +++ b/pkg/handler/collector/gcs/gcs.go @@ -20,71 +20,67 @@ import ( "errors" "fmt" "io" - "os" "time" "cloud.google.com/go/storage" "google.golang.org/api/iterator" - "google.golang.org/api/option" "github.com/guacsec/guac/pkg/handler/processor" "github.com/guacsec/guac/pkg/logging" - "github.com/guacsec/guac/pkg/version" ) type gcs struct { bucket string reader gcsReader + client *storage.Client lastDownload time.Time poll bool interval time.Duration } -const ( - // gcsCredsEnv is the env variable to hold the json creds file - gcsCredsEnv = "GOOGLE_APPLICATION_CREDENTIALS" - // Specify the GCS bucket address - bucketEnv = "GCS_BUCKET_ADDRESS" - CollectorGCS = "GCS" -) +const CollectorGCS = "GCS" -func getBucketPath() string { - if env := os.Getenv(bucketEnv); env != "" { - return env - } - return "" -} +// NewGCSCollector initializes the gcs and sets it for polling or one time run +func NewGCSCollector(opts ...Opt) (*gcs, error) { + gstore := &gcs{} -func getCredsPath() string { - if env := os.Getenv(gcsCredsEnv); env != "" { - return env + for _, opt := range opts { + opt(gstore) } - return "" -} -// NewGCSClient initializes the gcs and sets it for polling or one time run -func NewGCSClient(ctx context.Context, poll bool, interval time.Duration) (*gcs, error) { - // TODO: Change to pass in token via command line - if getCredsPath() == "" { + // Set reader using both the client and bucket + gstore.reader = &reader{client: gstore.client, bucket: gstore.bucket} + + if gstore.bucket == "" { return nil, errors.New("gcs bucket not specified") } - client, err := storage.NewClient(ctx, - option.WithCredentialsFile(os.Getenv(gcsCredsEnv)), - option.WithUserAgent(version.UserAgent)) - if err != nil { - return nil, err + + if gstore.client == nil { + return nil, errors.New("gcs client not specified") } - bucket := getBucketPath() - if bucket == "" { - return nil, errors.New("gcs bucket not specified") + + return gstore, nil +} + +type Opt func(*gcs) + +func WithPolling(interval time.Duration) Opt { + return func(g *gcs) { + g.poll = true + g.interval = interval + } +} + +func WithClient(client *storage.Client) Opt { + return func(g *gcs) { + g.client = client } - gstore := &gcs{ - bucket: bucket, - reader: &reader{client: client, bucket: bucket}, - poll: poll, - interval: interval, +} + +func WithBucket(bucket string) Opt { + return func(g *gcs) { + g.bucket = bucket } - return gstore, nil } // Type is the collector type of the collector @@ -174,7 +170,7 @@ func (g *gcs) getArtifacts(ctx context.Context, docChannel chan<- *processor.Doc if g.lastDownload.IsZero() || attrs.Updated.After(g.lastDownload) { payload, err := g.getObject(ctx, attrs.Name) if err != nil { - logger.Warnf("failed to retrieve object: %s from bucket: %s", attrs.Name, g.bucket) + logger.Warnf("failed to retrieve object: %s from bucket: %s, error: %w", attrs.Name, g.bucket, err) continue } if len(payload) == 0 { diff --git a/pkg/handler/collector/gcs/gcs_test.go b/pkg/handler/collector/gcs/gcs_test.go index b0039dc7ce..58a90f0fbd 100644 --- a/pkg/handler/collector/gcs/gcs_test.go +++ b/pkg/handler/collector/gcs/gcs_test.go @@ -18,23 +18,23 @@ package gcs import ( "context" "errors" - "os" "reflect" "testing" "time" + "cloud.google.com/go/storage" "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/guacsec/guac/pkg/handler/collector" "github.com/guacsec/guac/pkg/handler/processor" ) func TestGCS_RetrieveArtifacts(t *testing.T) { - os.Setenv("GCS_BUCKET_ADDRESS", "some-bucket") + const bucketName = "some-bucket" ctx := context.Background() server := fakestorage.NewServer([]fakestorage.Object{ { ObjectAttrs: fakestorage.ObjectAttrs{ - BucketName: "some-bucket", + BucketName: bucketName, Name: "some/object/file.txt", Updated: time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC), }, @@ -50,7 +50,7 @@ func TestGCS_RetrieveArtifacts(t *testing.T) { Format: processor.FormatUnknown, SourceInformation: processor.SourceInformation{ Collector: string(CollectorGCS), - Source: getBucketPath() + "/some/object/file.txt", + Source: bucketName + "/some/object/file.txt", }, } @@ -73,8 +73,8 @@ func TestGCS_RetrieveArtifacts(t *testing.T) { }, { name: "get object", fields: fields{ - bucket: getBucketPath(), - reader: &reader{client: client, bucket: getBucketPath()}, + bucket: bucketName, + reader: &reader{client: client, bucket: bucketName}, }, want: []*processor.Document{doc}, wantErr: false, @@ -82,8 +82,8 @@ func TestGCS_RetrieveArtifacts(t *testing.T) { }, { name: "last download time the same", fields: fields{ - bucket: getBucketPath(), - reader: &reader{client: client, bucket: getBucketPath()}, + bucket: bucketName, + reader: &reader{client: client, bucket: bucketName}, lastDownload: time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC), }, want: nil, @@ -92,14 +92,15 @@ func TestGCS_RetrieveArtifacts(t *testing.T) { }, { name: "last download time set before", fields: fields{ - bucket: getBucketPath(), - reader: &reader{client: client, bucket: getBucketPath()}, + bucket: bucketName, + reader: &reader{client: client, bucket: bucketName}, lastDownload: time.Date(2009, 10, 17, 20, 34, 58, 651387237, time.UTC), }, want: []*processor.Document{doc}, wantErr: false, wantDone: true, }} + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := &gcs{ @@ -138,3 +139,85 @@ func TestGCS_RetrieveArtifacts(t *testing.T) { }) } } + +func TestNewGCSCollector(t *testing.T) { + var client = &storage.Client{} + + type args struct { + bucket string + pollInterval time.Duration + client *storage.Client + } + + tests := []struct { + name string + args args + want *gcs + wantErr bool + }{ + { + name: "no bucket", + args: args{}, + wantErr: true, + }, + { + name: "no client", + args: args{bucket: "some-bucket"}, + wantErr: true, + }, + { + name: "client and bucket", + args: args{ + client: client, + bucket: "some-bucket", + }, + want: &gcs{ + bucket: "some-bucket", + client: client, + reader: &reader{bucket: "some-bucket", client: client}, + poll: false, + }, + wantErr: false, + }, { + name: "bucket and poll", + args: args{ + bucket: "some-bucket", + client: client, + pollInterval: 2 * time.Minute, + }, + want: &gcs{ + bucket: "some-bucket", + client: client, + reader: &reader{bucket: "some-bucket", client: client}, + poll: true, + interval: 2 * time.Minute, + }, + wantErr: false, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := []Opt{} + + if tt.args.pollInterval.Nanoseconds() > 0 { + opts = append(opts, WithPolling(tt.args.pollInterval)) + } + + if tt.args.bucket != "" { + opts = append(opts, WithBucket(tt.args.bucket)) + } + + if tt.args.client != nil { + opts = append(opts, WithClient(tt.args.client)) + } + + g, err := NewGCSCollector(opts...) + if (err != nil) != tt.wantErr { + t.Errorf("NewGCSCollector() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(g, tt.want) { + t.Errorf("NewGCSCollector() = %v, want %v", g, tt.want) + } + }) + } +}