Skip to content

Commit

Permalink
feat: import object to seed peer with max replicas (#1413)
Browse files Browse the repository at this point in the history
* feat: create object storage with max replicas

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: import object storage with max replicas

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: add strings.Unique func

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Jun 27, 2022
1 parent c3164fe commit 65dc29a
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 23 deletions.
8 changes: 8 additions & 0 deletions client/config/peerhost.go
Expand Up @@ -139,6 +139,12 @@ func (p *DaemonOption) Validate() error {
return errors.Errorf("rate limit must be greater than %s", DefaultMinRate.String())
}

if p.ObjectStorage.Enable {
if p.ObjectStorage.MaxReplicas <= 0 {
return errors.New("max replicas must be greater than 0")
}
}

switch p.Download.DefaultPattern {
case PatternP2P, PatternSeedPeer, PatternSource:
default:
Expand Down Expand Up @@ -370,6 +376,8 @@ type ObjectStorageOption struct {
// filtering unnecessary query params in the URL,
// it is separated by & character.
Filter string `mapstructure:"filter" yaml:"filter"`
// MaxReplicas is the maximum number of replicas of an object cache in seed peers.
MaxReplicas int `mapstructure:"maxReplicas" yaml:"maxReplicas"`
// ListenOption is object storage service listener.
ListenOption `yaml:",inline" mapstructure:",squash"`
}
Expand Down
5 changes: 3 additions & 2 deletions client/config/peerhost_darwin.go
Expand Up @@ -119,8 +119,9 @@ var peerHostConfig = DaemonOption{
},
},
ObjectStorage: ObjectStorageOption{
Enable: false,
Filter: "Expires&Signature&ns",
Enable: false,
Filter: "Expires&Signature&ns",
MaxReplicas: 3,
ListenOption: ListenOption{
Security: SecurityOption{
Insecure: true,
Expand Down
5 changes: 3 additions & 2 deletions client/config/peerhost_linux.go
Expand Up @@ -118,8 +118,9 @@ var peerHostConfig = DaemonOption{
},
},
ObjectStorage: ObjectStorageOption{
Enable: false,
Filter: "Expires&Signature&ns",
Enable: false,
Filter: "Expires&Signature&ns",
MaxReplicas: 3,
ListenOption: ListenOption{
Security: SecurityOption{
Insecure: true,
Expand Down
5 changes: 3 additions & 2 deletions client/config/peerhost_test.go
Expand Up @@ -327,8 +327,9 @@ func TestPeerHostOption_Load(t *testing.T) {
},
},
ObjectStorage: ObjectStorageOption{
Enable: true,
Filter: "Expires&Signature&ns",
Enable: true,
Filter: "Expires&Signature&ns",
MaxReplicas: 3,
ListenOption: ListenOption{
Security: SecurityOption{
Insecure: true,
Expand Down
1 change: 1 addition & 0 deletions client/config/testdata/config/daemon.yaml
Expand Up @@ -73,6 +73,7 @@ upload:
objectStorage:
enable: true
filter: Expires&Signature&ns
maxReplicas: 3
security:
insecure: true
caCert: caCert
Expand Down
41 changes: 28 additions & 13 deletions client/daemon/objectstorage/objectstorage.go
Expand Up @@ -47,6 +47,7 @@ import (
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/objectstorage"
"d7y.io/dragonfly/v2/pkg/rpc/base"
pkgstrings "d7y.io/dragonfly/v2/pkg/strings"
)

const (
Expand Down Expand Up @@ -307,11 +308,12 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
}

var (
bucketName = params.ID
objectKey = form.Key
mode = form.Mode
filter = form.Filter
fileHeader = form.File
bucketName = params.ID
objectKey = form.Key
mode = form.Mode
filter = form.Filter
maxReplicas = form.MaxReplicas
fileHeader = form.File
)

client, err := o.client()
Expand Down Expand Up @@ -346,6 +348,11 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
urlMeta.Filter = filter
}

// Initialize max replicas.
if maxReplicas == 0 {
maxReplicas = o.config.ObjectStorage.MaxReplicas
}

// Initialize task id and peer id.
taskID := idgen.TaskID(signURL, urlMeta)
peerID := o.peerIDGenerator.PeerID()
Expand Down Expand Up @@ -380,9 +387,8 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
case WriteBack:
// Import object to seed peer.
go func() {
log.Infof("import object %s to seed peer", objectKey)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader); err != nil {
log.Errorf("import object %s to seed peer failed: %s", objectKey, err)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader, maxReplicas, log); err != nil {
log.Errorf("import object %s to seed peers failed: %s", objectKey, err)
}
}()

Expand All @@ -399,9 +405,8 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
case AsyncWriteBack:
// Import object to seed peer.
go func() {
log.Infof("import object %s to seed peer", objectKey)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader); err != nil {
log.Errorf("import object %s to seed peer failed: %s", objectKey, err)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader, maxReplicas, log); err != nil {
log.Errorf("import object %s to seed peers failed: %s", objectKey, err)
}
}()

Expand Down Expand Up @@ -477,7 +482,7 @@ func (o *objectStorage) importObjectToLocalStorage(ctx context.Context, taskID,
}

// importObjectToSeedPeers uses to import object to available seed peers.
func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, objectKey string, mode int, fileHeader *multipart.FileHeader) error {
func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, objectKey string, mode int, fileHeader *multipart.FileHeader, maxReplicas int, log *logger.SugaredLoggerOnWith) error {
schedulers, err := o.dynconfig.GetSchedulers()
if err != nil {
return err
Expand All @@ -491,13 +496,23 @@ func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName,
}
}
}
seedPeerHosts = pkgstrings.Unique(seedPeerHosts)

var replicas int
for _, seedPeerHost := range seedPeerHosts {
log.Infof("import object %s to seed peer %s", objectKey, seedPeerHost)
if err := o.importObjectToSeedPeer(ctx, seedPeerHost, bucketName, objectKey, mode, fileHeader); err != nil {
return err
log.Errorf("import object %s to seed peer %s failed: %s", objectKey, seedPeerHost, err)
continue
}

replicas++
if replicas >= maxReplicas {
break
}
}

log.Infof("import %d object %s to seed peers", replicas, objectKey)
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions client/daemon/objectstorage/types.go
Expand Up @@ -28,10 +28,11 @@ type CreateObjectParams struct {
}

type CreateObjectRequset struct {
Key string `form:"key" binding:"required"`
Mode uint `form:"mode,default=0" binding:"omitempty,gte=0,lte=2"`
Filter string `form:"filter" binding:"omitempty"`
File *multipart.FileHeader `form:"file" binding:"required"`
Key string `form:"key" binding:"required"`
Mode uint `form:"mode,default=0" binding:"omitempty,gte=0,lte=2"`
Filter string `form:"filter" binding:"omitempty"`
MaxReplicas int `form:"maxReplicas" binding:"omitempty,gt=0,lte=100"`
File *multipart.FileHeader `form:"file" binding:"required"`
}

type GetObjectQuery struct {
Expand Down
14 changes: 14 additions & 0 deletions pkg/strings/strings.go
Expand Up @@ -35,3 +35,17 @@ func Contains(slice []string, ele string) bool {

return false
}

// Remove the duplicate elements in the string slice
func Unique(slice []string) []string {
keys := make(map[string]bool)
result := []string{}
for _, entry := range slice {
if _, ok := keys[entry]; !ok {
keys[entry] = true
result = append(result, entry)
}
}

return result
}
8 changes: 8 additions & 0 deletions pkg/strings/strings_test.go
Expand Up @@ -32,3 +32,11 @@ func TestContains(t *testing.T) {
assert.True(t, Contains([]string{"a", "B"}, "B"))
assert.False(t, Contains([]string{"a", "B"}, "b"))
}

func TestUnique(t *testing.T) {
assert.EqualValues(t, Unique([]string{"a", "B"}), []string{"a", "B"})
assert.EqualValues(t, Unique([]string{"a", "a", "B", "B"}), []string{"a", "B"})
assert.EqualValues(t, Unique([]string{"a", "B", "a", "B"}), []string{"a", "B"})
assert.EqualValues(t, Unique([]string{}), []string{})
assert.EqualValues(t, Unique([]string{}), []string{})
}

0 comments on commit 65dc29a

Please sign in to comment.