Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/blob store #2

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
FROM golang:1.18.1-alpine AS build
WORKDIR /src
RUN apk add --no-cache file git
RUN apk update && apk add --no-cache file git
ENV GOMODCACHE /root/.cache/gocache
RUN --mount=target=. --mount=target=/root/.cache,type=cache \
CGO_ENABLED=0 go build -o /out/archivist -ldflags '-s -d -w' ./cmd/archivist; \
COPY . .
RUN CGO_ENABLED=0 go build -o /out/archivist -ldflags '-s -d -w' ./cmd/archivist; \
file /out/archivist | grep "statically linked"

FROM alpine
Expand Down
91 changes: 53 additions & 38 deletions cmd/archivist/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package main

import (
"context"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/testifysec/archivist/internal/storage/blob"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -54,28 +56,16 @@ func main() {
)
defer cancel()

// ********************************************************************************
// Setup logging
// ********************************************************************************
logrus.SetFormatter(&nested.Formatter{})
log.EnableTracing(true)
ctx = log.WithLog(ctx, logruslogger.New(ctx, map[string]interface{}{"cmd": os.Args[0]}))

// ********************************************************************************
// Debug self if necessary
// ********************************************************************************

if err := debug.Self(); err != nil {
log.FromContext(ctx).Infof("%s", err)
}

startTime := time.Now()

// enumerating phases

// ********************************************************************************
log.FromContext(ctx).Infof("executing phase 1: get config from environment (time since start: %s)", time.Since(startTime))
// ********************************************************************************
now := time.Now()

cfg := new(config.Config)
Expand All @@ -90,52 +80,48 @@ func main() {
logrus.SetLevel(level)

log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 1: get config from environment")
// ********************************************************************************

log.FromContext(ctx).Infof("executing phase 2: get spiffe svid (time since start: %s)", time.Since(startTime))
// ********************************************************************************
now = time.Now()
var source *workloadapi.X509Source
var svid *x509svid.SVID

log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 2: retrieve spiffe svid")
grpcOptions := make([]grpc.ServerOption, 0)
if cfg.EnableSPIFFE == true {
source, err = workloadapi.NewX509Source(ctx)
if err != nil {
logrus.Fatalf("error getting x509 source: %+v", err)
}
svid, err = source.GetX509SVID()
if err != nil {
logrus.Fatalf("error getting x509 svid: %+v", err)
}
logrus.Infof("SVID: %q", svid.ID)
log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 2: retrieve spiffe svid")
opts := initSpiffeConnection(ctx, cfg)
grpcOptions = append(grpcOptions, opts...)
} else {
log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 2: SKIPPED")
}
// ********************************************************************************

log.FromContext(ctx).Infof("executing phase 3: initializing badger (time since start: %s)", time.Since(startTime))
// ********************************************************************************
now = time.Now()

store, storeCh, err := mysqlstore.NewServer(ctx, "")
graphStore, storeCh, err := mysqlstore.NewServer(ctx, cfg.SQLStoreConnectionString)
if err != nil {
logrus.Fatalf("error starting badger store: %+v", err)
}

log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 3: initializing badger")
// ********************************************************************************

log.FromContext(ctx).Infof("executing phase 4: create and register grpc service (time since start: %s)", time.Since(startTime))
// ********************************************************************************
now = time.Now()

grpcOptions := make([]grpc.ServerOption, 0)
if cfg.EnableSPIFFE == true {
grpcOptions = append(grpcOptions, grpc.Creds(credentials.NewTLS(tlsconfig.MTLSServerConfig(source, source, tlsconfig.AuthorizeAny()))))
blobStore, err := blob.NewMinioClient(
cfg.BlobStoreEndpoint,
cfg.BlobStoreAccessKeyId,
cfg.BlobStoreSecretAccessKeyId,
cfg.BlobStoreBucketName,
cfg.BlobStoreUseSSL,
)
if err != nil {
logrus.Fatalf("failed to create blob store client: %v", err)
}

grpcServer := grpc.NewServer(grpcOptions...)
archivistService := server.NewArchivistServer(store)
archivistService := server.NewArchivistServer(graphStore, blobStore)
archivist.RegisterArchivistServer(grpcServer, archivistService)

collectorService := server.NewCollectorServer(store)
collectorService := server.NewCollectorServer(graphStore, blobStore)
archivist.RegisterCollectorServer(grpcServer, collectorService)

srvErrCh := grpcutils.ListenAndServe(ctx, &cfg.ListenOn, grpcServer)
Expand All @@ -149,9 +135,38 @@ func main() {
<-srvErrCh
<-storeCh

//// ********************************************************************************
log.FromContext(ctx).Infof("exiting, uptime: %v", time.Since(startTime))
//// ********************************************************************************
}

func initSpiffeConnection(ctx context.Context, cfg *config.Config) []grpc.ServerOption {
var source *workloadapi.X509Source
var svid *x509svid.SVID
var authorizer tlsconfig.Authorizer

if cfg.SPIFFETrustedServerId != "" {
trustID := spiffeid.RequireFromString(cfg.SPIFFETrustedServerId)
authorizer = tlsconfig.AuthorizeID(trustID)
} else {
authorizer = tlsconfig.AuthorizeAny()
}

workloadOpts := []workloadapi.X509SourceOption{
workloadapi.WithClientOptions(workloadapi.WithAddr(cfg.SPIFFEAddress)),
}
source, err := workloadapi.NewX509Source(ctx, workloadOpts...)
if err != nil {
logrus.Fatalf("error getting x509 source: %+v", err)
}
opts := []grpc.ServerOption{
grpc.Creds(credentials.NewTLS(tlsconfig.MTLSServerConfig(source, source, authorizer))),
}

svid, err = source.GetX509SVID()
if err != nil {
logrus.Fatalf("error getting x509 svid: %+v", err)
}
logrus.Infof("SVID: %q", svid.ID)
return opts
}

func exitOnErrCh(ctx context.Context, cancel context.CancelFunc, errCh <-chan error) {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/agext/levenshtein v1.2.1 // indirect
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/go-ini/ini v1.66.4 // indirect
github.com/go-logr/logr v1.2.1 // indirect
github.com/go-logr/stdr v1.2.0 // indirect
github.com/go-openapi/inflect v0.19.0 // indirect
Expand All @@ -32,6 +33,8 @@ require (
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/hcl/v2 v2.10.0 // indirect
github.com/minio/minio-go v6.0.14+incompatible // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 // indirect
github.com/networkservicemesh/api v1.3.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ github.com/ghodss/yaml v0.0.0-20180820084758-c7ce16629ff4/go.mod h1:4dBDuWmgqj2H
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/git-bom/gitbom-go v0.0.0-20220502033008-4a48bb2317f7 h1:unHPfG96U9hCpN3oOMPSecP8PjyGhMGw/osjFm90e4o=
github.com/git-bom/gitbom-go v0.0.0-20220502033008-4a48bb2317f7/go.mod h1:xZ9N3/niYCZoKvc7ZlHq54lYvnb/U7l8ORLYf0irM1I=
github.com/go-ini/ini v1.66.4 h1:dKjMqkcbkzfddhIhyglTPgMoJnkvmG+bSLrU9cTHc5M=
github.com/go-ini/ini v1.66.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.1 h1:DX7uPQ4WgAWfoh+NGGlbJQswnYIVvz0SRlLS3rPZQDA=
github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down Expand Up @@ -183,6 +185,10 @@ github.com/mattn/go-runewidth v0.0.0-20181025052659-b20a3daf6a39/go.mod h1:LwmH8
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o=
github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 h1:DpOJ2HYzCv8LZP15IdmG+YdwD2luVPHITV96TkirNBM=
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mna/pigeon v0.0.0-20180808201053-bb0192cfc2ae/go.mod h1:Iym28+kJVnC1hfQvv5MUtI6AiFFzvQjHcvI4RFTG/04=
Expand Down
14 changes: 11 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,17 @@ import (
)

type Config struct {
EnableSPIFFE bool `default:"TRUE" desc:"Enable SPIFFE support" split_words:"true"`
ListenOn url.URL `default:"unix:///listen.on.socket" desc:"url to listen on" split_words:"true"`
LogLevel string `default:"INFO" desc:"Log level" split_words:"true"`
EnableSPIFFE bool `default:"TRUE" desc:"Enable SPIFFE support" split_words:"true"`
ListenOn url.URL `default:"unix:///listen.on.socket" desc:"url to listen on" split_words:"true"`
LogLevel string `default:"INFO" desc:"Log level" split_words:"true"`
SPIFFEAddress string `default:"unix:///tmp/spire-agent/public/api.sock" desc:"SPIFFE server address" split_words:"true"`
SPIFFETrustedServerId string `default:"" desc:"Trusted SPIFFE server ID; defaults to any" split_words:"true"`
SQLStoreConnectionString string `default:"root:example@tcp(db)/testify" desc:"SQL store connection string" split_words:"true"`
BlobStoreEndpoint string `default:"127.0.0.1:9000" desc:"URL endpoint for blob storage" split_words:"true"`
BlobStoreAccessKeyId string `default:"Blob store access key id" desc:"" split_words:"true"`
BlobStoreSecretAccessKeyId string `default:"Blob store secret access key id" desc:"" split_words:"true"`
BlobStoreUseSSL bool `default:"TRUE" desc:"Minio SSL toggle" split_words:"true"`
BlobStoreBucketName string `default:"" desc:"Blob store bucket name" split_words:"true"`
}

// Process reads config from env
Expand Down
55 changes: 46 additions & 9 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,84 @@
package server

import (
"bytes"
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/testifysec/archivist-api/pkg/api/archivist"
"github.com/testifysec/archivist/internal/storage/blob"
"google.golang.org/protobuf/types/known/emptypb"
"strings"
)

type archivistServer struct {
archivist.UnimplementedArchivistServer

store archivist.ArchivistServer
indexer blob.Indexer
store archivist.ArchivistServer
}

func NewArchivistServer(store archivist.ArchivistServer) archivist.ArchivistServer {
func NewArchivistServer(store archivist.ArchivistServer, indexer blob.Indexer) archivist.ArchivistServer {
return &archivistServer{
store: store,
store: store,
indexer: indexer,
}
}
func (s *archivistServer) GetBySubjectDigest(ctx context.Context, request *archivist.GetBySubjectDigestRequest) (*archivist.GetBySubjectDigestResponse, error) {
logrus.WithContext(ctx).Printf("retrieving by subject... ")
return s.store.GetBySubjectDigest(ctx, request)
resp, err := s.store.GetBySubjectDigest(ctx, request)
if err != nil {
return nil, fmt.Errorf("failed to retrieve subject by digest: %v", err)
}

shas := resp.GetObject()
logrus.WithContext(ctx).Printf("shas fetched: %s", shas)
attestations := make([]string, 0)

for _, gitbomSha := range shas {
obj, err := s.indexer.GetBlob(gitbomSha)
if err != nil {
return nil, fmt.Errorf("failed fetching ref by %s from store: %v", gitbomSha, err)
}
attestations = append(attestations, strings.TrimSpace(string(bytes.Trim(obj, "\x00"))))
}
return &archivist.GetBySubjectDigestResponse{Object: attestations}, nil
}

type collectorServer struct {
archivist.UnimplementedCollectorServer

store archivist.CollectorServer
indexer blob.Indexer
store archivist.CollectorServer
}

func NewCollectorServer(store archivist.CollectorServer) archivist.CollectorServer {
func NewCollectorServer(store archivist.CollectorServer, indexer blob.Indexer) archivist.CollectorServer {
return &collectorServer{
store: store,
store: store,
indexer: indexer,
}
}

// Store stores the dsse envelope and its relationships into the backend stores
func (s *collectorServer) Store(ctx context.Context, request *archivist.StoreRequest) (*emptypb.Empty, error) {
fmt.Println("middleware: store")

res, err := s.store.Store(ctx, request)
if err != nil {
logrus.WithContext(ctx).Printf("received error from database: %+v", err)
return nil, err
}

envBytes := []byte(request.Object)
ref, err := s.indexer.GetRef(envBytes)
if err != nil {
logrus.WithContext(ctx).Printf("failed to get ref for envelope: %v", err)
return nil, err
}

err = s.indexer.PutBlob(ref, envBytes)
if err != nil {
logrus.WithContext(ctx).Printf("failed to put blob in store for envelope: %v", err)
return nil, err
}

return res, nil
}
Loading