From 9cb2cd08530fc19e365cf91eb55c9716eca92fc8 Mon Sep 17 00:00:00 2001 From: Jess Bodzo Date: Wed, 4 May 2022 14:37:54 -0400 Subject: [PATCH 1/3] Add in minio blob store and indexer --- Dockerfile | 9 +- cmd/archivist/main.go | 100 +++++++++++++--------- go.mod | 3 + go.sum | 6 ++ internal/config/config.go | 14 ++- internal/server/server.go | 55 ++++++++++-- internal/storage/blob/minio.go | 123 +++++++++++++++++++++++++++ internal/storage/mysqlstore/mysql.go | 2 +- 8 files changed, 257 insertions(+), 55 deletions(-) create mode 100644 internal/storage/blob/minio.go diff --git a/Dockerfile b/Dockerfile index 4738b93c..5e475826 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,10 @@ FROM golang:1.18.1-alpine AS build WORKDIR /src -RUN apk add --no-cache file git -ENV GOMODCACHE /root/.cache/gocache -RUN --mount=target=. --mount=target=/root/.cache,type=cache \ - CGO_ENABLED=0 go build -o /out/archivist -ldflags '-s -d -w' ./cmd/archivist; \ +RUN apk update && apk add --no-cache file git +# ENV GOMODCACHE /root/.cache/gocache +# RUN --mount=target=. --mount=target=/root/.cache,type=cache \ +COPY . . +RUN CGO_ENABLED=0 go build -o /out/archivist -ldflags '-s -d -w' ./cmd/archivist; \ file /out/archivist | grep "statically linked" FROM alpine diff --git a/cmd/archivist/main.go b/cmd/archivist/main.go index d8073993..bebcace4 100755 --- a/cmd/archivist/main.go +++ b/cmd/archivist/main.go @@ -21,6 +21,8 @@ package main import ( "context" + "github.com/spiffe/go-spiffe/v2/spiffeid" + "github.com/testifysec/archivist/internal/storage/blob" "os" "os/signal" "syscall" @@ -54,28 +56,16 @@ func main() { ) defer cancel() - // ******************************************************************************** - // Setup logging - // ******************************************************************************** logrus.SetFormatter(&nested.Formatter{}) log.EnableTracing(true) ctx = log.WithLog(ctx, logruslogger.New(ctx, map[string]interface{}{"cmd": os.Args[0]})) - // ******************************************************************************** - // Debug self if necessary - // ******************************************************************************** - if err := debug.Self(); err != nil { log.FromContext(ctx).Infof("%s", err) } - startTime := time.Now() - // enumerating phases - - // ******************************************************************************** log.FromContext(ctx).Infof("executing phase 1: get config from environment (time since start: %s)", time.Since(startTime)) - // ******************************************************************************** now := time.Now() cfg := new(config.Config) @@ -90,52 +80,48 @@ func main() { logrus.SetLevel(level) log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 1: get config from environment") - // ******************************************************************************** + log.FromContext(ctx).Infof("executing phase 2: get spiffe svid (time since start: %s)", time.Since(startTime)) - // ******************************************************************************** now = time.Now() - var source *workloadapi.X509Source - var svid *x509svid.SVID + + log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 2: retrieve spiffe svid") + grpcOptions := make([]grpc.ServerOption, 0) if cfg.EnableSPIFFE == true { - source, err = workloadapi.NewX509Source(ctx) - if err != nil { - logrus.Fatalf("error getting x509 source: %+v", err) - } - svid, err = source.GetX509SVID() - if err != nil { - logrus.Fatalf("error getting x509 svid: %+v", err) - } - logrus.Infof("SVID: %q", svid.ID) - log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 2: retrieve spiffe svid") + opts := initSpiffeConnection(ctx, cfg) + grpcOptions = append(grpcOptions, opts...) } else { log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 2: SKIPPED") } - // ******************************************************************************** + log.FromContext(ctx).Infof("executing phase 3: initializing badger (time since start: %s)", time.Since(startTime)) - // ******************************************************************************** now = time.Now() - store, storeCh, err := mysqlstore.NewServer(ctx, "") + graphStore, storeCh, err := mysqlstore.NewServer(ctx, cfg.SQLStoreConnectionString) if err != nil { logrus.Fatalf("error starting badger store: %+v", err) } log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 3: initializing badger") - // ******************************************************************************** + log.FromContext(ctx).Infof("executing phase 4: create and register grpc service (time since start: %s)", time.Since(startTime)) - // ******************************************************************************** now = time.Now() - grpcOptions := make([]grpc.ServerOption, 0) - if cfg.EnableSPIFFE == true { - grpcOptions = append(grpcOptions, grpc.Creds(credentials.NewTLS(tlsconfig.MTLSServerConfig(source, source, tlsconfig.AuthorizeAny())))) + blobStore, err := blob.NewMinioClient( + cfg.BlobStoreEndpoint, + cfg.BlobStoreAccessKeyId, + cfg.BlobStoreSecretAccessKeyId, + cfg.BlobStoreBucketName, + cfg.BlobStoreUseSSL, + ) + if err != nil { + logrus.Fatalf("failed to create blob store client: %v", err) } grpcServer := grpc.NewServer(grpcOptions...) - archivistService := server.NewArchivistServer(store) + archivistService := server.NewArchivistServer(graphStore, blobStore) archivist.RegisterArchivistServer(grpcServer, archivistService) - collectorService := server.NewCollectorServer(store) + collectorService := server.NewCollectorServer(graphStore, blobStore) archivist.RegisterCollectorServer(grpcServer, collectorService) srvErrCh := grpcutils.ListenAndServe(ctx, &cfg.ListenOn, grpcServer) @@ -149,9 +135,47 @@ func main() { <-srvErrCh <-storeCh - //// ******************************************************************************** log.FromContext(ctx).Infof("exiting, uptime: %v", time.Since(startTime)) - //// ******************************************************************************** +} + +func initSpiffeConnection(ctx context.Context, cfg *config.Config) []grpc.ServerOption { + var source *workloadapi.X509Source + var svid *x509svid.SVID + var authorizer tlsconfig.Authorizer + + if cfg.SPIFFETrustedServerId != "" { + trustID := spiffeid.RequireFromString(cfg.SPIFFETrustedServerId) + authorizer = tlsconfig.AuthorizeID(trustID) + } else { + authorizer = tlsconfig.AuthorizeAny() + } + + picker := func(ids []*x509svid.SVID) *x509svid.SVID { + for _, id := range ids { + if id.ID.String() == "spiffe://witness.com/collector" { + return id + } + } + return nil + } + workloadOpts := []workloadapi.X509SourceOption{ + workloadapi.WithDefaultX509SVIDPicker(picker), + workloadapi.WithClientOptions(workloadapi.WithAddr(cfg.SPIFFEAddress)), + } + source, err := workloadapi.NewX509Source(ctx, workloadOpts...) + if err != nil { + logrus.Fatalf("error getting x509 source: %+v", err) + } + opts := []grpc.ServerOption{ + grpc.Creds(credentials.NewTLS(tlsconfig.MTLSServerConfig(source, source, authorizer))), + } + + svid, err = source.GetX509SVID() + if err != nil { + logrus.Fatalf("error getting x509 svid: %+v", err) + } + logrus.Infof("SVID: %q", svid.ID) + return opts } func exitOnErrCh(ctx context.Context, cancel context.CancelFunc, errCh <-chan error) { diff --git a/go.mod b/go.mod index a6f455be..8b3c8931 100755 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/agext/levenshtein v1.2.1 // indirect github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect + github.com/go-ini/ini v1.66.4 // indirect github.com/go-logr/logr v1.2.1 // indirect github.com/go-logr/stdr v1.2.0 // indirect github.com/go-openapi/inflect v0.19.0 // indirect @@ -32,6 +33,8 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/hashicorp/hcl/v2 v2.10.0 // indirect + github.com/minio/minio-go v6.0.14+incompatible // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 // indirect github.com/networkservicemesh/api v1.3.0 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index 2cb80f43..44e104b2 100755 --- a/go.sum +++ b/go.sum @@ -73,6 +73,8 @@ github.com/ghodss/yaml v0.0.0-20180820084758-c7ce16629ff4/go.mod h1:4dBDuWmgqj2H github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/git-bom/gitbom-go v0.0.0-20220502033008-4a48bb2317f7 h1:unHPfG96U9hCpN3oOMPSecP8PjyGhMGw/osjFm90e4o= github.com/git-bom/gitbom-go v0.0.0-20220502033008-4a48bb2317f7/go.mod h1:xZ9N3/niYCZoKvc7ZlHq54lYvnb/U7l8ORLYf0irM1I= +github.com/go-ini/ini v1.66.4 h1:dKjMqkcbkzfddhIhyglTPgMoJnkvmG+bSLrU9cTHc5M= +github.com/go-ini/ini v1.66.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.1 h1:DX7uPQ4WgAWfoh+NGGlbJQswnYIVvz0SRlLS3rPZQDA= github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -183,6 +185,10 @@ github.com/mattn/go-runewidth v0.0.0-20181025052659-b20a3daf6a39/go.mod h1:LwmH8 github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o= +github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 h1:DpOJ2HYzCv8LZP15IdmG+YdwD2luVPHITV96TkirNBM= github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mna/pigeon v0.0.0-20180808201053-bb0192cfc2ae/go.mod h1:Iym28+kJVnC1hfQvv5MUtI6AiFFzvQjHcvI4RFTG/04= diff --git a/internal/config/config.go b/internal/config/config.go index cbea0f46..d37b71cc 100755 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -20,9 +20,17 @@ import ( ) type Config struct { - EnableSPIFFE bool `default:"TRUE" desc:"Enable SPIFFE support" split_words:"true"` - ListenOn url.URL `default:"unix:///listen.on.socket" desc:"url to listen on" split_words:"true"` - LogLevel string `default:"INFO" desc:"Log level" split_words:"true"` + EnableSPIFFE bool `default:"TRUE" desc:"Enable SPIFFE support" split_words:"true"` + ListenOn url.URL `default:"unix:///listen.on.socket" desc:"url to listen on" split_words:"true"` + LogLevel string `default:"INFO" desc:"Log level" split_words:"true"` + SPIFFEAddress string `default:"unix:///tmp/spire-agent/public/api.sock" desc:"SPIFFE server address" split_words:"true"` + SPIFFETrustedServerId string `default:"" desc:"Trusted SPIFFE server ID; defaults to any" split_words:"true"` + SQLStoreConnectionString string `default:"root:example@tcp(db)/testify" desc:"SQL store connection string" split_words:"true"` + BlobStoreEndpoint string `default:"127.0.0.1:9000" desc:"URL endpoint for blob storage" split_words:"true"` + BlobStoreAccessKeyId string `default:"Blob store access key id" desc:"" split_words:"true"` + BlobStoreSecretAccessKeyId string `default:"Blob store secret access key id" desc:"" split_words:"true"` + BlobStoreUseSSL bool `default:"TRUE" desc:"Minio SSL toggle" split_words:"true"` + BlobStoreBucketName string `default:"" desc:"Blob store bucket name" split_words:"true"` } // Process reads config from env diff --git a/internal/server/server.go b/internal/server/server.go index eb70d9b0..3adbcbbb 100755 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -15,47 +15,84 @@ package server import ( + "bytes" "context" "fmt" "github.com/sirupsen/logrus" "github.com/testifysec/archivist-api/pkg/api/archivist" + "github.com/testifysec/archivist/internal/storage/blob" "google.golang.org/protobuf/types/known/emptypb" + "strings" ) type archivistServer struct { archivist.UnimplementedArchivistServer - - store archivist.ArchivistServer + indexer blob.Indexer + store archivist.ArchivistServer } -func NewArchivistServer(store archivist.ArchivistServer) archivist.ArchivistServer { +func NewArchivistServer(store archivist.ArchivistServer, indexer blob.Indexer) archivist.ArchivistServer { return &archivistServer{ - store: store, + store: store, + indexer: indexer, } } func (s *archivistServer) GetBySubjectDigest(ctx context.Context, request *archivist.GetBySubjectDigestRequest) (*archivist.GetBySubjectDigestResponse, error) { logrus.WithContext(ctx).Printf("retrieving by subject... ") - return s.store.GetBySubjectDigest(ctx, request) + resp, err := s.store.GetBySubjectDigest(ctx, request) + if err != nil { + return nil, fmt.Errorf("failed to retrieve subject by digest: %v", err) + } + + shas := resp.GetObject() + logrus.WithContext(ctx).Printf("shas fetched: %s", shas) + attestations := make([]string, 0) + + for _, gitbomSha := range shas { + obj, err := s.indexer.GetBlob(gitbomSha) + if err != nil { + return nil, fmt.Errorf("failed fetching ref by %s from store: %v", gitbomSha, err) + } + attestations = append(attestations, strings.TrimSpace(string(bytes.Trim(obj, "\x00")))) + } + return &archivist.GetBySubjectDigestResponse{Object: attestations}, nil } type collectorServer struct { archivist.UnimplementedCollectorServer - - store archivist.CollectorServer + indexer blob.Indexer + store archivist.CollectorServer } -func NewCollectorServer(store archivist.CollectorServer) archivist.CollectorServer { +func NewCollectorServer(store archivist.CollectorServer, indexer blob.Indexer) archivist.CollectorServer { return &collectorServer{ - store: store, + store: store, + indexer: indexer, } } +// Store stores the dsse envelope and its relationships into the backend stores func (s *collectorServer) Store(ctx context.Context, request *archivist.StoreRequest) (*emptypb.Empty, error) { fmt.Println("middleware: store") + res, err := s.store.Store(ctx, request) if err != nil { logrus.WithContext(ctx).Printf("received error from database: %+v", err) return nil, err } + + envBytes := []byte(request.Object) + ref, err := s.indexer.GetRef(envBytes) + if err != nil { + logrus.WithContext(ctx).Printf("failed to get ref for envelope: %v", err) + return nil, err + } + + err = s.indexer.PutBlob(ref, envBytes) + if err != nil { + logrus.WithContext(ctx).Printf("failed to put blob in store for envelope: %v", err) + return nil, err + } + return res, nil } diff --git a/internal/storage/blob/minio.go b/internal/storage/blob/minio.go new file mode 100644 index 00000000..86cf45f6 --- /dev/null +++ b/internal/storage/blob/minio.go @@ -0,0 +1,123 @@ +// Copyright 2022 The Archivist Contributors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blob + +import ( + "bytes" + "fmt" + "github.com/git-bom/gitbom-go" + "github.com/minio/minio-go" + "github.com/minio/minio-go/pkg/credentials" + "io" +) + +// Indexer calculates the index reference for an input blob, +// and gets/puts blobs at that index in and out of the backing +// blob storage. +type Indexer interface { + GetRef(obj []byte) (string, error) + GetBlob(idx string) ([]byte, error) + PutBlob(idx string, obj []byte) error +} + +type attestationBlobStore struct { + client *minio.Client + bucket string + location string +} + +// GetRef calculates the index reference for a given object +func (store *attestationBlobStore) GetRef(obj []byte) (string, error) { + gb := gitbom.NewSha256GitBom() + if err := gb.AddReference(obj, nil); err != nil { + return "", err + } + return gb.Identity(), nil +} + +// GetBlob retrieves an attesation from the backend store +func (store *attestationBlobStore) GetBlob(idx string) ([]byte, error) { + opt := minio.GetObjectOptions{} + chunkSize := 8 * 1024 + buf := make([]byte, chunkSize) + outBuf := bytes.NewBuffer(buf) + + obj, err := store.client.GetObject(store.bucket, idx, opt) + if err != nil { + return buf, err + } + + var n int64 + for { + _ = opt.SetRange(n, n+int64(chunkSize)-1) + readBytes, err := outBuf.ReadFrom(obj) + if err == nil { + return outBuf.Bytes(), nil + } + if err != nil { + if err == io.EOF { + _, err = outBuf.ReadFrom(bytes.NewReader(buf)) + break + } + } + + n += readBytes + _, err = outBuf.ReadFrom(bytes.NewReader(buf)) + if err != nil { + return buf, fmt.Errorf("failed to chunk blob: %v", err) + } + } + return []byte{}, fmt.Errorf("failed to read out object: %v", err) +} + +// PutBlob stores the attestation blob into the backend store +func (store *attestationBlobStore) PutBlob(idx string, obj []byte) error { + opt := minio.PutObjectOptions{} + size := int64(len(obj)) + n, err := store.client.PutObject(store.bucket, idx, bytes.NewReader(obj), size, opt) + if err != nil { + return fmt.Errorf("failed to put blob: %v", err) + } else if n != size { + return fmt.Errorf("failed to upload full blob: size %d != uploaded size %d", size, n) + } + return nil +} + +// NewMinioClient returns a reader/writer for storing/retrieving attestations +func NewMinioClient(endpoint, accessKeyId, secretAccessKeyId, bucketName string, useSSL bool) (Indexer, error) { + c, err := minio.NewWithOptions(endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(accessKeyId, secretAccessKeyId, ""), + Secure: useSSL, + }) + if err != nil { + return nil, err + } + + exists, err := c.BucketExists(bucketName) + if !exists || err != nil { + return nil, fmt.Errorf("failed to find bucket exists: %v", err) + } + + loc, err := c.GetBucketLocation(bucketName) + if err != nil { + return nil, err + } + + return &attestationBlobStore{ + client: c, + location: loc, + bucket: bucketName, + }, nil +} diff --git a/internal/storage/mysqlstore/mysql.go b/internal/storage/mysqlstore/mysql.go index a567305f..ef147bf3 100755 --- a/internal/storage/mysqlstore/mysql.go +++ b/internal/storage/mysqlstore/mysql.go @@ -50,7 +50,7 @@ type store struct { } func NewServer(ctx context.Context, connectionstring string) (UnifiedStorage, chan error, error) { - drv, err := sql.Open("mysql", "root:example@tcp(db)/testify") + drv, err := sql.Open("mysql", connectionstring) if err != nil { return nil, nil, err } From 3d9682f877d75ef19ea34574e147b13b033ab145 Mon Sep 17 00:00:00 2001 From: Jess Bodzo Date: Sat, 7 May 2022 19:56:30 -0400 Subject: [PATCH 2/3] Revert to go caching mod --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5e475826..d8aa3ee0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ FROM golang:1.18.1-alpine AS build WORKDIR /src RUN apk update && apk add --no-cache file git -# ENV GOMODCACHE /root/.cache/gocache -# RUN --mount=target=. --mount=target=/root/.cache,type=cache \ +ENV GOMODCACHE /root/.cache/gocache +RUN --mount=target=. --mount=target=/root/.cache,type=cache \ COPY . . RUN CGO_ENABLED=0 go build -o /out/archivist -ldflags '-s -d -w' ./cmd/archivist; \ file /out/archivist | grep "statically linked" From 6173254bb9452e3c2a02951ace7c28dd43f9fbfb Mon Sep 17 00:00:00 2001 From: Jess Bodzo Date: Sat, 7 May 2022 19:57:58 -0400 Subject: [PATCH 3/3] Remove picker func override for testing --- cmd/archivist/main.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/cmd/archivist/main.go b/cmd/archivist/main.go index bebcace4..555f728e 100755 --- a/cmd/archivist/main.go +++ b/cmd/archivist/main.go @@ -150,16 +150,7 @@ func initSpiffeConnection(ctx context.Context, cfg *config.Config) []grpc.Server authorizer = tlsconfig.AuthorizeAny() } - picker := func(ids []*x509svid.SVID) *x509svid.SVID { - for _, id := range ids { - if id.ID.String() == "spiffe://witness.com/collector" { - return id - } - } - return nil - } workloadOpts := []workloadapi.X509SourceOption{ - workloadapi.WithDefaultX509SVIDPicker(picker), workloadapi.WithClientOptions(workloadapi.WithAddr(cfg.SPIFFEAddress)), } source, err := workloadapi.NewX509Source(ctx, workloadOpts...)