Skip to content

Commit

Permalink
feat(collector): expose Google Cloud Storage collector from guacone C…
Browse files Browse the repository at this point in the history
…LI (#989)

* feat(collector): expose Google Cloud Storage collector from guacone CLI

Signed-off-by: Miguel Martinez Trivino <miguel@chainloop.dev>

* apply feedback

Signed-off-by: Miguel Martinez Trivino <miguel@chainloop.dev>

---------

Signed-off-by: Miguel Martinez Trivino <miguel@chainloop.dev>
  • Loading branch information
migmartri committed Jun 28, 2023
1 parent 9bfd464 commit 5f261e9
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 50 deletions.
190 changes: 190 additions & 0 deletions cmd/guacone/cmd/gcs.go
@@ -0,0 +1,190 @@
//
// Copyright 2022 The GUAC Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"context"
"fmt"
"os"
"time"

"cloud.google.com/go/storage"
"github.com/guacsec/guac/pkg/cli"
csub_client "github.com/guacsec/guac/pkg/collectsub/client"
"github.com/guacsec/guac/pkg/handler/collector"
"github.com/guacsec/guac/pkg/handler/collector/gcs"
"github.com/guacsec/guac/pkg/handler/processor"
"github.com/guacsec/guac/pkg/logging"
"github.com/guacsec/guac/pkg/version"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"google.golang.org/api/option"
)

type gcsOptions struct {
graphqlEndpoint string
csubAddr string
bucket string
}

const gcsCredentialsPathFlag = "gcp-credentials-path"

var gcsCmd = &cobra.Command{
Use: "gcs [flags] bucket_name",
Short: "takes SBOMs and attestations from a Google Cloud Storage bucket and injects them to GUAC graph. This command talks directly to the graphQL endpoint",
Example: "guacone collect gcs my-bucket --gcs-credentials-path /secret/sa.json",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)

opts, err := validateGCSFlags(viper.GetString("gql-addr"), viper.GetString("csub-addr"), viper.GetString(gcsCredentialsPathFlag), args)
if err != nil {
fmt.Printf("unable to validate flags: %v\n", err)
_ = cmd.Help()
os.Exit(1)
}

gcsOpts := []option.ClientOption{
option.WithUserAgent(version.UserAgent),
}

// Credential flag is not mandatory since they can also be loaded from
// the environment variable GOOGLE_APPLICATION_CREDENTIALS by the client, by default
if credsPath := viper.GetString(gcsCredentialsPathFlag); credsPath != "" {
gcsOpts = append(gcsOpts, option.WithCredentialsFile(credsPath))
}

client, err := storage.NewClient(ctx, gcsOpts...)
if err != nil {
logger.Fatalf("creating client: %v", err)
}

// Register collector by providing a new GCS Client and bucket name
gcsCollector, err := gcs.NewGCSCollector(gcs.WithBucket(opts.bucket), gcs.WithClient(client))
if err != nil {
logger.Fatalf("unable to create gcs client: %v", err)
}

err = collector.RegisterDocumentCollector(gcsCollector, gcs.CollectorGCS)
if err != nil {
logger.Fatalf("unable to register gcs collector: %v", err)
}

// initialize collectsub client
csubClient, err := csub_client.NewClient(opts.csubAddr)
if err != nil {
logger.Infof("collectsub client initialization failed, this ingestion will not pull in any additional data through the collectsub service: %v", err)
csubClient = nil
} else {
defer csubClient.Close()
}

// TODO: this code could be extracted to a function and reused by the
// other collectors (files and oci)
// Get pipeline of components
processorFunc := getProcessor(ctx)
ingestorFunc := getIngestor(ctx)
collectSubEmitFunc := getCollectSubEmit(ctx, csubClient)
assemblerFunc := getAssembler(ctx, opts.graphqlEndpoint)

totalNum := 0
gotErr := false
// Set emit function to go through the entire pipeline
emit := func(d *processor.Document) error {
totalNum += 1
start := time.Now()

docTree, err := processorFunc(d)
if err != nil {
gotErr = true
return fmt.Errorf("unable to process doc: %v, format: %v, document: %v", err, d.Format, d.Type)
}

predicates, idstrings, err := ingestorFunc(docTree)
if err != nil {
gotErr = true
return fmt.Errorf("unable to ingest doc tree: %v", err)
}

err = collectSubEmitFunc(idstrings)
if err != nil {
logger.Infof("unable to create entries in collectsub server, but continuing: %v", err)
}

err = assemblerFunc(predicates)
if err != nil {
gotErr = true
return fmt.Errorf("unable to assemble graphs: %v", err)
}
t := time.Now()
elapsed := t.Sub(start)
logger.Infof("[%v] completed doc %+v", elapsed, d.SourceInformation)
return nil
}

// Collect
errHandler := func(err error) bool {
if err == nil {
logger.Info("collector ended gracefully")
return true
}
logger.Errorf("collector ended with error: %v", err)
return false
}
if err := collector.Collect(ctx, emit, errHandler); err != nil {
logger.Fatal(err)
}

if gotErr {
logger.Fatalf("completed ingestion with errors")
} else {
logger.Infof("completed ingesting %v documents", totalNum)
}
},
}

func validateGCSFlags(gqlEndpoint, csubAddr, credentialsPath string, args []string) (gcsOptions, error) {
var opts gcsOptions
opts.graphqlEndpoint = gqlEndpoint
opts.csubAddr = csubAddr

if len(args) < 1 {
return opts, fmt.Errorf("expected positional argument: bucket")
}
opts.bucket = args[0]

if credentialsPath == "" && os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" {
return opts, fmt.Errorf("expected either --%s flag or GOOGLE_APPLICATION_CREDENTIALS environment variable", gcsCredentialsPathFlag)
}

return opts, nil
}

func init() {
set, err := cli.BuildFlags([]string{gcsCredentialsPathFlag})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err)
os.Exit(1)
}
gcsCmd.Flags().AddFlagSet(set)
if err := viper.BindPFlags(gcsCmd.Flags()); err != nil {
fmt.Fprintf(os.Stderr, "failed to bind flags: %v", err)
os.Exit(1)
}

collectCmd.AddCommand(gcsCmd)
}
80 changes: 80 additions & 0 deletions cmd/guacone/cmd/gcs_test.go
@@ -0,0 +1,80 @@
//
// Copyright 2022 The GUAC Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import "testing"

func TestValidateGCSFlags(t *testing.T) {
testCases := []struct {
name string
args []string
credentialsPath string
credsEnvVarSet bool
errorMsg string
}{
{
name: "no args",
errorMsg: "expected positional argument: bucket",
},
{
name: "no credentials",
args: []string{"bucket"},
errorMsg: "expected either --gcp-credentials-path flag or GOOGLE_APPLICATION_CREDENTIALS environment variable",
},
{
name: "credentials path and env var set",
args: []string{"bucket"},
credentialsPath: "/path/to/creds.json",
credsEnvVarSet: true,
errorMsg: "",
},
{
name: "credentials path and env var not set",
args: []string{"bucket"},
credentialsPath: "/path/to/creds.json",
},
{
name: "credentials path not set and env var set",
args: []string{"bucket"},
credsEnvVarSet: true,
errorMsg: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.credsEnvVarSet {
t.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "/path/to/creds.json")
}

o, err := validateGCSFlags("", "", tc.credentialsPath, tc.args)
if err != nil {
if tc.errorMsg != err.Error() {
t.Errorf("expected error message: %s, got: %s", tc.errorMsg, err.Error())
}
} else {
if tc.errorMsg != "" {
t.Errorf("expected error message: %s, got: %s", tc.errorMsg, err.Error())
}

if o.bucket != tc.args[0] {
t.Errorf("expected bucket: %s, got: %s", tc.args[0], o.bucket)
}
}
})
}

}
3 changes: 3 additions & 0 deletions pkg/cli/store.go
Expand Up @@ -65,6 +65,9 @@ func init() {
set.StringP("vuln-id", "v", "", "CVE, GHSA or OSV ID to check")
set.Int("num-path", 0, "number of paths to return, 0 means all paths")

// Google Cloud platform flags
set.String("gcp-credentials-path", "", "Path to the Google Cloud service account credentials json file.\nAlternatively you can set GOOGLE_APPLICATION_CREDENTIALS=<path> in your environment.")

set.VisitAll(func(f *pflag.Flag) {
flagStore[f.Name] = f
})
Expand Down
76 changes: 36 additions & 40 deletions pkg/handler/collector/gcs/gcs.go
Expand Up @@ -20,71 +20,67 @@ import (
"errors"
"fmt"
"io"
"os"
"time"

"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

"github.com/guacsec/guac/pkg/handler/processor"
"github.com/guacsec/guac/pkg/logging"
"github.com/guacsec/guac/pkg/version"
)

type gcs struct {
bucket string
reader gcsReader
client *storage.Client
lastDownload time.Time
poll bool
interval time.Duration
}

const (
// gcsCredsEnv is the env variable to hold the json creds file
gcsCredsEnv = "GOOGLE_APPLICATION_CREDENTIALS"
// Specify the GCS bucket address
bucketEnv = "GCS_BUCKET_ADDRESS"
CollectorGCS = "GCS"
)
const CollectorGCS = "GCS"

func getBucketPath() string {
if env := os.Getenv(bucketEnv); env != "" {
return env
}
return ""
}
// NewGCSCollector initializes the gcs and sets it for polling or one time run
func NewGCSCollector(opts ...Opt) (*gcs, error) {
gstore := &gcs{}

func getCredsPath() string {
if env := os.Getenv(gcsCredsEnv); env != "" {
return env
for _, opt := range opts {
opt(gstore)
}
return ""
}

// NewGCSClient initializes the gcs and sets it for polling or one time run
func NewGCSClient(ctx context.Context, poll bool, interval time.Duration) (*gcs, error) {
// TODO: Change to pass in token via command line
if getCredsPath() == "" {
// Set reader using both the client and bucket
gstore.reader = &reader{client: gstore.client, bucket: gstore.bucket}

if gstore.bucket == "" {
return nil, errors.New("gcs bucket not specified")
}
client, err := storage.NewClient(ctx,
option.WithCredentialsFile(os.Getenv(gcsCredsEnv)),
option.WithUserAgent(version.UserAgent))
if err != nil {
return nil, err

if gstore.client == nil {
return nil, errors.New("gcs client not specified")
}
bucket := getBucketPath()
if bucket == "" {
return nil, errors.New("gcs bucket not specified")

return gstore, nil
}

type Opt func(*gcs)

func WithPolling(interval time.Duration) Opt {
return func(g *gcs) {
g.poll = true
g.interval = interval
}
}

func WithClient(client *storage.Client) Opt {
return func(g *gcs) {
g.client = client
}
gstore := &gcs{
bucket: bucket,
reader: &reader{client: client, bucket: bucket},
poll: poll,
interval: interval,
}

func WithBucket(bucket string) Opt {
return func(g *gcs) {
g.bucket = bucket
}
return gstore, nil
}

// Type is the collector type of the collector
Expand Down Expand Up @@ -174,7 +170,7 @@ func (g *gcs) getArtifacts(ctx context.Context, docChannel chan<- *processor.Doc
if g.lastDownload.IsZero() || attrs.Updated.After(g.lastDownload) {
payload, err := g.getObject(ctx, attrs.Name)
if err != nil {
logger.Warnf("failed to retrieve object: %s from bucket: %s", attrs.Name, g.bucket)
logger.Warnf("failed to retrieve object: %s from bucket: %s, error: %w", attrs.Name, g.bucket, err)
continue
}
if len(payload) == 0 {
Expand Down

0 comments on commit 5f261e9

Please sign in to comment.