Skip to content

Commit

Permalink
Add catalog to minio (#1040)
Browse files Browse the repository at this point in the history
* Implement cataloger interface for Minio

* Catalog fix

* Implemented Catalog method in the Minio storage package

* code fmt fix

* fmt fix

* remove unused channel
  • Loading branch information
Zyqsempai authored and Aaron Schlesinger committed Feb 16, 2019
1 parent c2647da commit 4dfa993
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 7 deletions.
84 changes: 84 additions & 0 deletions pkg/storage/minio/cataloger.go
@@ -0,0 +1,84 @@
package minio

import (
"context"
"fmt"
"strings"

"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ"
"github.com/gomods/athens/pkg/paths"
"github.com/minio/minio-go"
)

// Catalog implements the (./pkg/storage).Cataloger interface
// It returns a list of modules and versions contained in the storage
func (s *storageImpl) Catalog(ctx context.Context, token string, pageSize int) ([]paths.AllPathParams, string, error) {
const op errors.Op = "minio.Catalog"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
queryToken := token
res := make([]paths.AllPathParams, 0)
startAfter := token
token = ""

count := pageSize
for count > 0 {
loo, err := s.minioCore.ListObjectsV2(s.bucketName, token, "", false, "", 0, startAfter)
if err != nil {
return nil, "", errors.E(op, err)
}

m, lastKey := fetchModsAndVersions(loo.Contents, count)

res = append(res, m...)
count -= len(m)
queryToken = lastKey

if !loo.IsTruncated { // not truncated, there is no point in asking more
if count > 0 { // it means we reached the end, no subsequent requests are necessary
queryToken = ""
}
break
}
}

return res, queryToken, nil
}

func fetchModsAndVersions(objects []minio.ObjectInfo, elementsNum int) ([]paths.AllPathParams, string) {
res := make([]paths.AllPathParams, 0)
lastKey := ""

for _, o := range objects {
if !strings.HasSuffix(o.Key, ".info") {
continue
}

p, err := parseMinioKey(&o)
if err != nil {
continue
}

res = append(res, p)
lastKey = o.Key

elementsNum--
if elementsNum == 0 {
break
}
}
return res, lastKey
}

func parseMinioKey(o *minio.ObjectInfo) (paths.AllPathParams, error) {
const op errors.Op = "minio.parseMinioKey"
parts := strings.Split(o.Key, "/")
v := parts[len(parts)-2]
m := strings.Replace(o.Key, v, "", -2)
m = strings.Replace(m, "//.info", "", -1)
if m == "" || v == "" {
return paths.AllPathParams{}, errors.E(op, fmt.Errorf("invalid object key format %s", o.Key))
}
return paths.AllPathParams{m, v}, nil
}
5 changes: 3 additions & 2 deletions pkg/storage/minio/lister.go
Expand Up @@ -18,8 +18,9 @@ func (l *storageImpl) List(ctx context.Context, module string) ([]string, error)
doneCh := make(chan struct{})
defer close(doneCh)
searchPrefix := module + "/"
objectCh := l.minioClient.ListObjectsV2(l.bucketName, searchPrefix, false, doneCh)
for object := range objectCh {
objectCh, _ := l.minioCore.ListObjectsV2(l.bucketName, searchPrefix, "", false, "", 0, "")

for _, object := range objectCh.Contents {
if object.Err != nil {
return nil, errors.E(op, object.Err, errors.M(module))
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/minio/minio.go
Expand Up @@ -12,6 +12,7 @@ import (

type storageImpl struct {
minioClient *minio.Client
minioCore *minio.Core
bucketName string
}

Expand All @@ -29,6 +30,7 @@ func NewStorage(conf *config.MinioConfig, timeout time.Duration) (storage.Backen
bucketName := conf.Bucket
region := conf.Region
useSSL := conf.EnableSSL
minioCore, err := minio.NewCore(endpoint, accessKeyID, secretAccessKey, useSSL)
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, useSSL)
if err != nil {
return nil, errors.E(op, err)
Expand All @@ -42,5 +44,5 @@ func NewStorage(conf *config.MinioConfig, timeout time.Duration) (storage.Backen
return nil, errors.E(op, err)
}
}
return &storageImpl{minioClient, bucketName}, nil
return &storageImpl{minioClient, minioCore, bucketName}, nil
}
6 changes: 2 additions & 4 deletions pkg/storage/minio/minio_test.go
Expand Up @@ -19,10 +19,8 @@ func BenchmarkBackend(b *testing.B) {
}

func (s *storageImpl) clear() error {
doneCh := make(chan struct{})
defer close(doneCh)
objectCh := s.minioClient.ListObjectsV2(s.bucketName, "", true, doneCh)
for object := range objectCh {
objectCh, _ := s.minioCore.ListObjectsV2(s.bucketName, "", "", false, "", 0, "")
for _, object := range objectCh.Contents {
if object.Err != nil {
return object.Err
}
Expand Down

0 comments on commit 4dfa993

Please sign in to comment.