From 4dfa99320f98be06dd51bd20616b7c49bbdb116f Mon Sep 17 00:00:00 2001 From: Boris Popovschi Date: Sat, 16 Feb 2019 06:02:59 +0200 Subject: [PATCH] Add catalog to minio (#1040) * Implement cataloger interface for Minio * Catalog fix * Implemented Catalog method in the Minio storage package * code fmt fix * fmt fix * remove unused channel --- pkg/storage/minio/cataloger.go | 84 +++++++++++++++++++++++++++++++++ pkg/storage/minio/lister.go | 5 +- pkg/storage/minio/minio.go | 4 +- pkg/storage/minio/minio_test.go | 6 +-- 4 files changed, 92 insertions(+), 7 deletions(-) create mode 100644 pkg/storage/minio/cataloger.go diff --git a/pkg/storage/minio/cataloger.go b/pkg/storage/minio/cataloger.go new file mode 100644 index 000000000..a518aed57 --- /dev/null +++ b/pkg/storage/minio/cataloger.go @@ -0,0 +1,84 @@ +package minio + +import ( + "context" + "fmt" + "strings" + + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/observ" + "github.com/gomods/athens/pkg/paths" + "github.com/minio/minio-go" +) + +// Catalog implements the (./pkg/storage).Cataloger interface +// It returns a list of modules and versions contained in the storage +func (s *storageImpl) Catalog(ctx context.Context, token string, pageSize int) ([]paths.AllPathParams, string, error) { + const op errors.Op = "minio.Catalog" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + queryToken := token + res := make([]paths.AllPathParams, 0) + startAfter := token + token = "" + + count := pageSize + for count > 0 { + loo, err := s.minioCore.ListObjectsV2(s.bucketName, token, "", false, "", 0, startAfter) + if err != nil { + return nil, "", errors.E(op, err) + } + + m, lastKey := fetchModsAndVersions(loo.Contents, count) + + res = append(res, m...) + count -= len(m) + queryToken = lastKey + + if !loo.IsTruncated { // not truncated, there is no point in asking more + if count > 0 { // it means we reached the end, no subsequent requests are necessary + queryToken = "" + } + break + } + } + + return res, queryToken, nil +} + +func fetchModsAndVersions(objects []minio.ObjectInfo, elementsNum int) ([]paths.AllPathParams, string) { + res := make([]paths.AllPathParams, 0) + lastKey := "" + + for _, o := range objects { + if !strings.HasSuffix(o.Key, ".info") { + continue + } + + p, err := parseMinioKey(&o) + if err != nil { + continue + } + + res = append(res, p) + lastKey = o.Key + + elementsNum-- + if elementsNum == 0 { + break + } + } + return res, lastKey +} + +func parseMinioKey(o *minio.ObjectInfo) (paths.AllPathParams, error) { + const op errors.Op = "minio.parseMinioKey" + parts := strings.Split(o.Key, "/") + v := parts[len(parts)-2] + m := strings.Replace(o.Key, v, "", -2) + m = strings.Replace(m, "//.info", "", -1) + if m == "" || v == "" { + return paths.AllPathParams{}, errors.E(op, fmt.Errorf("invalid object key format %s", o.Key)) + } + return paths.AllPathParams{m, v}, nil +} diff --git a/pkg/storage/minio/lister.go b/pkg/storage/minio/lister.go index 31f94dc6b..5af3554a3 100644 --- a/pkg/storage/minio/lister.go +++ b/pkg/storage/minio/lister.go @@ -18,8 +18,9 @@ func (l *storageImpl) List(ctx context.Context, module string) ([]string, error) doneCh := make(chan struct{}) defer close(doneCh) searchPrefix := module + "/" - objectCh := l.minioClient.ListObjectsV2(l.bucketName, searchPrefix, false, doneCh) - for object := range objectCh { + objectCh, _ := l.minioCore.ListObjectsV2(l.bucketName, searchPrefix, "", false, "", 0, "") + + for _, object := range objectCh.Contents { if object.Err != nil { return nil, errors.E(op, object.Err, errors.M(module)) } diff --git a/pkg/storage/minio/minio.go b/pkg/storage/minio/minio.go index 01a87be10..d70ae237d 100644 --- a/pkg/storage/minio/minio.go +++ b/pkg/storage/minio/minio.go @@ -12,6 +12,7 @@ import ( type storageImpl struct { minioClient *minio.Client + minioCore *minio.Core bucketName string } @@ -29,6 +30,7 @@ func NewStorage(conf *config.MinioConfig, timeout time.Duration) (storage.Backen bucketName := conf.Bucket region := conf.Region useSSL := conf.EnableSSL + minioCore, err := minio.NewCore(endpoint, accessKeyID, secretAccessKey, useSSL) minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, useSSL) if err != nil { return nil, errors.E(op, err) @@ -42,5 +44,5 @@ func NewStorage(conf *config.MinioConfig, timeout time.Duration) (storage.Backen return nil, errors.E(op, err) } } - return &storageImpl{minioClient, bucketName}, nil + return &storageImpl{minioClient, minioCore, bucketName}, nil } diff --git a/pkg/storage/minio/minio_test.go b/pkg/storage/minio/minio_test.go index 6fd14f70d..3af29d47a 100644 --- a/pkg/storage/minio/minio_test.go +++ b/pkg/storage/minio/minio_test.go @@ -19,10 +19,8 @@ func BenchmarkBackend(b *testing.B) { } func (s *storageImpl) clear() error { - doneCh := make(chan struct{}) - defer close(doneCh) - objectCh := s.minioClient.ListObjectsV2(s.bucketName, "", true, doneCh) - for object := range objectCh { + objectCh, _ := s.minioCore.ListObjectsV2(s.bucketName, "", "", false, "", 0, "") + for _, object := range objectCh.Contents { if object.Err != nil { return object.Err }