Skip to content

Commit df479ba

Browse files
fix(aliyundrive_open): limit rate for every request (close #724) (#1011)
* fix(aliyundrive_open): limit rate for `Remove` and `MakeDir`; reduce limit for `List` and `Link` (close #724) * Update drivers/aliyundrive_open/driver.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: 火星大王 <34576789+huoxingdawang@users.noreply.github.com> * Update drivers/aliyundrive_open/driver.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: 火星大王 <34576789+huoxingdawang@users.noreply.github.com> * fix(aliyundrive_open): limit rate for every request * fix(aliyundrive_open): fix limiter not work on reference driver * fix(aliyundrive_open): typo * fix(aliyundrive_open): limiter not set to nil after free * fix(aliyundrive_share): limit rate for every request --------- Signed-off-by: 火星大王 <34576789+huoxingdawang@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 5ae8e96 commit df479ba

File tree

9 files changed

+259
-105
lines changed

9 files changed

+259
-105
lines changed

drivers/aliyundrive_open/driver.go

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package aliyundrive_open
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"net/http"
87
"path/filepath"
98
"time"
@@ -13,7 +12,6 @@ import (
1312
"github.com/OpenListTeam/OpenList/v4/internal/errs"
1413
"github.com/OpenListTeam/OpenList/v4/internal/model"
1514
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
16-
"github.com/OpenListTeam/rateg"
1715
"github.com/go-resty/resty/v2"
1816
log "github.com/sirupsen/logrus"
1917
)
@@ -24,9 +22,8 @@ type AliyundriveOpen struct {
2422

2523
DriveId string
2624

27-
limitList func(ctx context.Context, data base.Json) (*Files, error)
28-
limitLink func(ctx context.Context, file model.Obj) (*model.Link, error)
29-
ref *AliyundriveOpen
25+
limiter *limiter
26+
ref *AliyundriveOpen
3027
}
3128

3229
func (d *AliyundriveOpen) Config() driver.Config {
@@ -38,25 +35,23 @@ func (d *AliyundriveOpen) GetAddition() driver.Additional {
3835
}
3936

4037
func (d *AliyundriveOpen) Init(ctx context.Context) error {
38+
d.limiter = getLimiterForUser(globalLimiterUserID) // First create a globally shared limiter to limit the initial requests.
4139
if d.LIVPDownloadFormat == "" {
4240
d.LIVPDownloadFormat = "jpeg"
4341
}
4442
if d.DriveType == "" {
4543
d.DriveType = "default"
4644
}
47-
res, err := d.request("/adrive/v1.0/user/getDriveInfo", http.MethodPost, nil)
45+
res, err := d.request(ctx, limiterOther, "/adrive/v1.0/user/getDriveInfo", http.MethodPost, nil)
4846
if err != nil {
47+
d.limiter.free()
48+
d.limiter = nil
4949
return err
5050
}
5151
d.DriveId = utils.Json.Get(res, d.DriveType+"_drive_id").ToString()
52-
d.limitList = rateg.LimitFnCtx(d.list, rateg.LimitFnOption{
53-
Limit: 4,
54-
Bucket: 1,
55-
})
56-
d.limitLink = rateg.LimitFnCtx(d.link, rateg.LimitFnOption{
57-
Limit: 1,
58-
Bucket: 1,
59-
})
52+
userid := utils.Json.Get(res, "user_id").ToString()
53+
d.limiter.free()
54+
d.limiter = getLimiterForUser(userid) // Allocate a corresponding limiter for each user.
6055
return nil
6156
}
6257

@@ -70,6 +65,8 @@ func (d *AliyundriveOpen) InitReference(storage driver.Driver) error {
7065
}
7166

7267
func (d *AliyundriveOpen) Drop(ctx context.Context) error {
68+
d.limiter.free()
69+
d.limiter = nil
7370
d.ref = nil
7471
return nil
7572
}
@@ -87,9 +84,6 @@ func (d *AliyundriveOpen) GetRoot(ctx context.Context) (model.Obj, error) {
8784
}
8885

8986
func (d *AliyundriveOpen) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) {
90-
if d.limitList == nil {
91-
return nil, fmt.Errorf("driver not init")
92-
}
9387
files, err := d.getFiles(ctx, dir.GetID())
9488
if err != nil {
9589
return nil, err
@@ -107,8 +101,8 @@ func (d *AliyundriveOpen) List(ctx context.Context, dir model.Obj, args model.Li
107101
return objs, err
108102
}
109103

110-
func (d *AliyundriveOpen) link(ctx context.Context, file model.Obj) (*model.Link, error) {
111-
res, err := d.request("/adrive/v1.0/openFile/getDownloadUrl", http.MethodPost, func(req *resty.Request) {
104+
func (d *AliyundriveOpen) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
105+
res, err := d.request(ctx, limiterLink, "/adrive/v1.0/openFile/getDownloadUrl", http.MethodPost, func(req *resty.Request) {
112106
req.SetBody(base.Json{
113107
"drive_id": d.DriveId,
114108
"file_id": file.GetID(),
@@ -132,17 +126,10 @@ func (d *AliyundriveOpen) link(ctx context.Context, file model.Obj) (*model.Link
132126
}, nil
133127
}
134128

135-
func (d *AliyundriveOpen) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
136-
if d.limitLink == nil {
137-
return nil, fmt.Errorf("driver not init")
138-
}
139-
return d.limitLink(ctx, file)
140-
}
141-
142129
func (d *AliyundriveOpen) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) {
143130
nowTime, _ := getNowTime()
144131
newDir := File{CreatedAt: nowTime, UpdatedAt: nowTime}
145-
_, err := d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
132+
_, err := d.request(ctx, limiterOther, "/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
146133
req.SetBody(base.Json{
147134
"drive_id": d.DriveId,
148135
"parent_file_id": parentDir.GetID(),
@@ -168,7 +155,7 @@ func (d *AliyundriveOpen) MakeDir(ctx context.Context, parentDir model.Obj, dirN
168155

169156
func (d *AliyundriveOpen) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
170157
var resp MoveOrCopyResp
171-
_, err := d.request("/adrive/v1.0/openFile/move", http.MethodPost, func(req *resty.Request) {
158+
_, err := d.request(ctx, limiterOther, "/adrive/v1.0/openFile/move", http.MethodPost, func(req *resty.Request) {
172159
req.SetBody(base.Json{
173160
"drive_id": d.DriveId,
174161
"file_id": srcObj.GetID(),
@@ -198,7 +185,7 @@ func (d *AliyundriveOpen) Move(ctx context.Context, srcObj, dstDir model.Obj) (m
198185

199186
func (d *AliyundriveOpen) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) {
200187
var newFile File
201-
_, err := d.request("/adrive/v1.0/openFile/update", http.MethodPost, func(req *resty.Request) {
188+
_, err := d.request(ctx, limiterOther, "/adrive/v1.0/openFile/update", http.MethodPost, func(req *resty.Request) {
202189
req.SetBody(base.Json{
203190
"drive_id": d.DriveId,
204191
"file_id": srcObj.GetID(),
@@ -230,7 +217,7 @@ func (d *AliyundriveOpen) Rename(ctx context.Context, srcObj model.Obj, newName
230217

231218
func (d *AliyundriveOpen) Copy(ctx context.Context, srcObj, dstDir model.Obj) error {
232219
var resp MoveOrCopyResp
233-
_, err := d.request("/adrive/v1.0/openFile/copy", http.MethodPost, func(req *resty.Request) {
220+
_, err := d.request(ctx, limiterOther, "/adrive/v1.0/openFile/copy", http.MethodPost, func(req *resty.Request) {
234221
req.SetBody(base.Json{
235222
"drive_id": d.DriveId,
236223
"file_id": srcObj.GetID(),
@@ -256,7 +243,7 @@ func (d *AliyundriveOpen) Remove(ctx context.Context, obj model.Obj) error {
256243
if d.RemoveWay == "delete" {
257244
uri = "/adrive/v1.0/openFile/delete"
258245
}
259-
_, err := d.request(uri, http.MethodPost, func(req *resty.Request) {
246+
_, err := d.request(ctx, limiterOther, uri, http.MethodPost, func(req *resty.Request) {
260247
req.SetBody(base.Json{
261248
"drive_id": d.DriveId,
262249
"file_id": obj.GetID(),
@@ -295,7 +282,7 @@ func (d *AliyundriveOpen) Other(ctx context.Context, args model.OtherArgs) (inte
295282
default:
296283
return nil, errs.NotSupport
297284
}
298-
_, err := d.request(uri, http.MethodPost, func(req *resty.Request) {
285+
_, err := d.request(ctx, limiterOther, uri, http.MethodPost, func(req *resty.Request) {
299286
req.SetBody(data).SetResult(&resp)
300287
})
301288
if err != nil {

drivers/aliyundrive_open/limiter.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package aliyundrive_open
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"golang.org/x/time/rate"
9+
)
10+
11+
// See document https://www.yuque.com/aliyundrive/zpfszx/mqocg38hlxzc5vcd
12+
// See issue https://github.com/OpenListTeam/OpenList/issues/724
13+
// We got limit per user per app, so the limiter should be global.
14+
15+
type limiterType int
16+
17+
const (
18+
limiterList limiterType = iota
19+
limiterLink
20+
limiterOther
21+
)
22+
23+
const (
24+
listRateLimit = 3.9 // 4 per second in document, but we use 3.9 per second to be safe
25+
linkRateLimit = 0.9 // 1 per second in document, but we use 0.9 per second to be safe
26+
otherRateLimit = 14.9 // 15 per second in document, but we use 14.9 per second to be safe
27+
globalLimiterUserID = "" // Global limiter user ID, used to limit the initial requests.
28+
)
29+
30+
type limiter struct {
31+
usedBy int
32+
list *rate.Limiter
33+
link *rate.Limiter
34+
other *rate.Limiter
35+
}
36+
37+
var limiters = make(map[string]*limiter)
38+
var limitersLock = &sync.Mutex{}
39+
40+
func getLimiterForUser(userid string) *limiter {
41+
limitersLock.Lock()
42+
defer limitersLock.Unlock()
43+
defer func() {
44+
// Clean up limiters that are no longer used.
45+
for id, lim := range limiters {
46+
if lim.usedBy <= 0 && id != globalLimiterUserID { // Do not delete the global limiter.
47+
delete(limiters, id)
48+
}
49+
}
50+
}()
51+
if lim, ok := limiters[userid]; ok {
52+
lim.usedBy++
53+
return lim
54+
}
55+
lim := &limiter{
56+
usedBy: 1,
57+
list: rate.NewLimiter(rate.Limit(listRateLimit), 1),
58+
link: rate.NewLimiter(rate.Limit(linkRateLimit), 1),
59+
other: rate.NewLimiter(rate.Limit(otherRateLimit), 1),
60+
}
61+
limiters[userid] = lim
62+
return lim
63+
}
64+
65+
func (l *limiter) wait(ctx context.Context, typ limiterType) error {
66+
if l == nil {
67+
return fmt.Errorf("driver not init")
68+
}
69+
switch typ {
70+
case limiterList:
71+
return l.list.Wait(ctx)
72+
case limiterLink:
73+
return l.link.Wait(ctx)
74+
case limiterOther:
75+
return l.other.Wait(ctx)
76+
default:
77+
return fmt.Errorf("unknown limiter type")
78+
}
79+
}
80+
func (l *limiter) free() {
81+
if l == nil {
82+
return
83+
}
84+
limitersLock.Lock()
85+
defer limitersLock.Unlock()
86+
l.usedBy--
87+
}
88+
func (d *AliyundriveOpen) wait(ctx context.Context, typ limiterType) error {
89+
if d == nil {
90+
return fmt.Errorf("driver not init")
91+
}
92+
if d.ref != nil {
93+
return d.ref.wait(ctx, typ) // If this is a reference driver, wait on the reference driver.
94+
}
95+
return d.limiter.wait(ctx, typ)
96+
}

drivers/aliyundrive_open/upload.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ func calPartSize(fileSize int64) int64 {
5050
return partSize
5151
}
5252

53-
func (d *AliyundriveOpen) getUploadUrl(count int, fileId, uploadId string) ([]PartInfo, error) {
53+
func (d *AliyundriveOpen) getUploadUrl(ctx context.Context, count int, fileId, uploadId string) ([]PartInfo, error) {
5454
partInfoList := makePartInfos(count)
5555
var resp CreateResp
56-
_, err := d.request("/adrive/v1.0/openFile/getUploadUrl", http.MethodPost, func(req *resty.Request) {
56+
_, err := d.request(ctx, limiterOther, "/adrive/v1.0/openFile/getUploadUrl", http.MethodPost, func(req *resty.Request) {
5757
req.SetBody(base.Json{
5858
"drive_id": d.DriveId,
5959
"file_id": fileId,
@@ -84,10 +84,10 @@ func (d *AliyundriveOpen) uploadPart(ctx context.Context, r io.Reader, partInfo
8484
return nil
8585
}
8686

87-
func (d *AliyundriveOpen) completeUpload(fileId, uploadId string) (model.Obj, error) {
87+
func (d *AliyundriveOpen) completeUpload(ctx context.Context, fileId, uploadId string) (model.Obj, error) {
8888
// 3. complete
8989
var newFile File
90-
_, err := d.request("/adrive/v1.0/openFile/complete", http.MethodPost, func(req *resty.Request) {
90+
_, err := d.request(ctx, limiterOther, "/adrive/v1.0/openFile/complete", http.MethodPost, func(req *resty.Request) {
9191
req.SetBody(base.Json{
9292
"drive_id": d.DriveId,
9393
"file_id": fileId,
@@ -180,7 +180,7 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
180180
createData["pre_hash"] = hash
181181
}
182182
var createResp CreateResp
183-
_, err, e := d.requestReturnErrResp("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
183+
_, err, e := d.requestReturnErrResp(ctx, limiterOther, "/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
184184
req.SetBody(createData).SetResult(&createResp)
185185
})
186186
if err != nil {
@@ -207,7 +207,7 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
207207
if err != nil {
208208
return nil, fmt.Errorf("cal proof code error: %s", err.Error())
209209
}
210-
_, err = d.request("/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
210+
_, err = d.request(ctx, limiterOther, "/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
211211
req.SetBody(createData).SetResult(&createResp)
212212
})
213213
if err != nil {
@@ -232,7 +232,7 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
232232
}
233233
// refresh upload url if 50 minutes passed
234234
if time.Since(preTime) > 50*time.Minute {
235-
createResp.PartInfoList, err = d.getUploadUrl(count, createResp.FileId, createResp.UploadId)
235+
createResp.PartInfoList, err = d.getUploadUrl(ctx, count, createResp.FileId, createResp.UploadId)
236236
if err != nil {
237237
return nil, err
238238
}
@@ -266,5 +266,5 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
266266

267267
log.Debugf("[aliyundrive_open] create file success, resp: %+v", createResp)
268268
// 3. complete
269-
return d.completeUpload(createResp.FileId, createResp.UploadId)
269+
return d.completeUpload(ctx, createResp.FileId, createResp.UploadId)
270270
}

0 commit comments

Comments
 (0)