Skip to content

Commit ddffacf

Browse files
j2rong4cnxrgzs
andauthored
perf: optimize IO read/write usage (#8243)
* perf: optimize IO read/write usage * . * Update drivers/139/driver.go Co-authored-by: MadDogOwner <xiaoran@xrgzs.top> --------- Co-authored-by: MadDogOwner <xiaoran@xrgzs.top>
1 parent 3375c26 commit ddffacf

File tree

29 files changed

+429
-343
lines changed

29 files changed

+429
-343
lines changed

drivers/115/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ func (d *Pan115) UploadByMultipart(ctx context.Context, params *driver115.Upload
405405
if _, err = tmpF.ReadAt(buf, chunk.Offset); err != nil && !errors.Is(err, io.EOF) {
406406
continue
407407
}
408-
if part, err = bucket.UploadPart(imur, driver.NewLimitedUploadStream(ctx, bytes.NewBuffer(buf)),
408+
if part, err = bucket.UploadPart(imur, driver.NewLimitedUploadStream(ctx, bytes.NewReader(buf)),
409409
chunk.Size, chunk.Number, driver115.OssOption(params, ossToken)...); err == nil {
410410
break
411411
}

drivers/123/driver.go

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,8 @@ package _123
22

33
import (
44
"context"
5-
"crypto/md5"
65
"encoding/base64"
7-
"encoding/hex"
86
"fmt"
9-
"io"
107
"net/http"
118
"net/url"
129
"sync"
@@ -18,6 +15,7 @@ import (
1815
"github.com/alist-org/alist/v3/internal/driver"
1916
"github.com/alist-org/alist/v3/internal/errs"
2017
"github.com/alist-org/alist/v3/internal/model"
18+
"github.com/alist-org/alist/v3/internal/stream"
2119
"github.com/alist-org/alist/v3/pkg/utils"
2220
"github.com/aws/aws-sdk-go/aws"
2321
"github.com/aws/aws-sdk-go/aws/credentials"
@@ -187,25 +185,12 @@ func (d *Pan123) Remove(ctx context.Context, obj model.Obj) error {
187185

188186
func (d *Pan123) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error {
189187
etag := file.GetHash().GetHash(utils.MD5)
188+
var err error
190189
if len(etag) < utils.MD5.Width {
191-
// const DEFAULT int64 = 10485760
192-
h := md5.New()
193-
// need to calculate md5 of the full content
194-
tempFile, err := file.CacheFullInTempFile()
190+
_, etag, err = stream.CacheFullInTempFileAndHash(file, utils.MD5)
195191
if err != nil {
196192
return err
197193
}
198-
defer func() {
199-
_ = tempFile.Close()
200-
}()
201-
if _, err = utils.CopyWithBuffer(h, tempFile); err != nil {
202-
return err
203-
}
204-
_, err = tempFile.Seek(0, io.SeekStart)
205-
if err != nil {
206-
return err
207-
}
208-
etag = hex.EncodeToString(h.Sum(nil))
209194
}
210195
data := base.Json{
211196
"driveId": 0,

drivers/123/upload.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"io"
7-
"math"
87
"net/http"
98
"strconv"
109

@@ -70,27 +69,33 @@ func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.F
7069
}
7170

7271
func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.FileStreamer, up driver.UpdateProgress) error {
73-
chunkSize := int64(1024 * 1024 * 16)
72+
tmpF, err := file.CacheFullInTempFile()
73+
if err != nil {
74+
return err
75+
}
7476
// fetch s3 pre signed urls
75-
chunkCount := int(math.Ceil(float64(file.GetSize()) / float64(chunkSize)))
77+
size := file.GetSize()
78+
chunkSize := min(size, 16*utils.MB)
79+
chunkCount := int(size / chunkSize)
80+
lastChunkSize := size % chunkSize
81+
if lastChunkSize > 0 {
82+
chunkCount++
83+
} else {
84+
lastChunkSize = chunkSize
85+
}
7686
// only 1 batch is allowed
77-
isMultipart := chunkCount > 1
7887
batchSize := 1
7988
getS3UploadUrl := d.getS3Auth
80-
if isMultipart {
89+
if chunkCount > 1 {
8190
batchSize = 10
8291
getS3UploadUrl = d.getS3PreSignedUrls
8392
}
84-
limited := driver.NewLimitedUploadStream(ctx, file)
8593
for i := 1; i <= chunkCount; i += batchSize {
8694
if utils.IsCanceled(ctx) {
8795
return ctx.Err()
8896
}
8997
start := i
90-
end := i + batchSize
91-
if end > chunkCount+1 {
92-
end = chunkCount + 1
93-
}
98+
end := min(i+batchSize, chunkCount+1)
9499
s3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, start, end)
95100
if err != nil {
96101
return err
@@ -102,9 +107,9 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
102107
}
103108
curSize := chunkSize
104109
if j == chunkCount {
105-
curSize = file.GetSize() - (int64(chunkCount)-1)*chunkSize
110+
curSize = lastChunkSize
106111
}
107-
err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.LimitReader(limited, chunkSize), curSize, false, getS3UploadUrl)
112+
err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.NewSectionReader(tmpF, chunkSize*int64(j-1), curSize), curSize, false, getS3UploadUrl)
108113
if err != nil {
109114
return err
110115
}
@@ -115,12 +120,12 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
115120
return d.completeS3(ctx, upReq, file, chunkCount > 1)
116121
}
117122

118-
func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader io.Reader, curSize int64, retry bool, getS3UploadUrl func(ctx context.Context, upReq *UploadResp, start int, end int) (*S3PreSignedURLs, error)) error {
123+
func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader *io.SectionReader, curSize int64, retry bool, getS3UploadUrl func(ctx context.Context, upReq *UploadResp, start int, end int) (*S3PreSignedURLs, error)) error {
119124
uploadUrl := s3PreSignedUrls.Data.PreSignedUrls[strconv.Itoa(cur)]
120125
if uploadUrl == "" {
121126
return fmt.Errorf("upload url is empty, s3PreSignedUrls: %+v", s3PreSignedUrls)
122127
}
123-
req, err := http.NewRequest("PUT", uploadUrl, reader)
128+
req, err := http.NewRequest("PUT", uploadUrl, driver.NewLimitedUploadStream(ctx, reader))
124129
if err != nil {
125130
return err
126131
}
@@ -143,6 +148,7 @@ func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSign
143148
}
144149
s3PreSignedUrls.Data.PreSignedUrls = newS3PreSignedUrls.Data.PreSignedUrls
145150
// retry
151+
reader.Seek(0, io.SeekStart)
146152
return d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, cur, end, reader, curSize, true, getS3UploadUrl)
147153
}
148154
if res.StatusCode != http.StatusOK {

drivers/139/driver.go

Lines changed: 48 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,19 @@ package _139
22

33
import (
44
"context"
5-
"encoding/base64"
65
"encoding/xml"
76
"fmt"
87
"io"
98
"net/http"
109
"path"
1110
"strconv"
12-
"strings"
1311
"time"
1412

1513
"github.com/alist-org/alist/v3/drivers/base"
1614
"github.com/alist-org/alist/v3/internal/driver"
1715
"github.com/alist-org/alist/v3/internal/errs"
1816
"github.com/alist-org/alist/v3/internal/model"
17+
streamPkg "github.com/alist-org/alist/v3/internal/stream"
1918
"github.com/alist-org/alist/v3/pkg/cron"
2019
"github.com/alist-org/alist/v3/pkg/utils"
2120
"github.com/alist-org/alist/v3/pkg/utils/random"
@@ -72,28 +71,29 @@ func (d *Yun139) Init(ctx context.Context) error {
7271
default:
7372
return errs.NotImplement
7473
}
75-
if d.ref != nil {
76-
return nil
77-
}
78-
decode, err := base64.StdEncoding.DecodeString(d.Authorization)
79-
if err != nil {
80-
return err
81-
}
82-
decodeStr := string(decode)
83-
splits := strings.Split(decodeStr, ":")
84-
if len(splits) < 2 {
85-
return fmt.Errorf("authorization is invalid, splits < 2")
86-
}
87-
d.Account = splits[1]
88-
_, err = d.post("/orchestration/personalCloud/user/v1.0/qryUserExternInfo", base.Json{
89-
"qryUserExternInfoReq": base.Json{
90-
"commonAccountInfo": base.Json{
91-
"account": d.getAccount(),
92-
"accountType": 1,
93-
},
94-
},
95-
}, nil)
96-
return err
74+
// if d.ref != nil {
75+
// return nil
76+
// }
77+
// decode, err := base64.StdEncoding.DecodeString(d.Authorization)
78+
// if err != nil {
79+
// return err
80+
// }
81+
// decodeStr := string(decode)
82+
// splits := strings.Split(decodeStr, ":")
83+
// if len(splits) < 2 {
84+
// return fmt.Errorf("authorization is invalid, splits < 2")
85+
// }
86+
// d.Account = splits[1]
87+
// _, err = d.post("/orchestration/personalCloud/user/v1.0/qryUserExternInfo", base.Json{
88+
// "qryUserExternInfoReq": base.Json{
89+
// "commonAccountInfo": base.Json{
90+
// "account": d.getAccount(),
91+
// "accountType": 1,
92+
// },
93+
// },
94+
// }, nil)
95+
// return err
96+
return nil
9797
}
9898

9999
func (d *Yun139) InitReference(storage driver.Driver) error {
@@ -503,53 +503,44 @@ func (d *Yun139) Remove(ctx context.Context, obj model.Obj) error {
503503
}
504504
}
505505

506-
const (
507-
_ = iota //ignore first value by assigning to blank identifier
508-
KB = 1 << (10 * iota)
509-
MB
510-
GB
511-
TB
512-
)
513-
514506
func (d *Yun139) getPartSize(size int64) int64 {
515507
if d.CustomUploadPartSize != 0 {
516508
return d.CustomUploadPartSize
517509
}
518510
// 网盘对于分片数量存在上限
519-
if size/GB > 30 {
520-
return 512 * MB
511+
if size/utils.GB > 30 {
512+
return 512 * utils.MB
521513
}
522-
return 100 * MB
514+
return 100 * utils.MB
523515
}
524516

525517
func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
526518
switch d.Addition.Type {
527519
case MetaPersonalNew:
528520
var err error
529521
fullHash := stream.GetHash().GetHash(utils.SHA256)
530-
if len(fullHash) <= 0 {
531-
tmpF, err := stream.CacheFullInTempFile()
532-
if err != nil {
533-
return err
534-
}
535-
fullHash, err = utils.HashFile(utils.SHA256, tmpF)
522+
if len(fullHash) != utils.SHA256.Width {
523+
_, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, utils.SHA256)
536524
if err != nil {
537525
return err
538526
}
539527
}
540528

541-
partInfos := []PartInfo{}
542-
var partSize = d.getPartSize(stream.GetSize())
543-
part := (stream.GetSize() + partSize - 1) / partSize
544-
if part == 0 {
529+
size := stream.GetSize()
530+
var partSize = d.getPartSize(size)
531+
part := size / partSize
532+
if size%partSize > 0 {
533+
part++
534+
} else if part == 0 {
545535
part = 1
546536
}
537+
partInfos := make([]PartInfo, 0, part)
547538
for i := int64(0); i < part; i++ {
548539
if utils.IsCanceled(ctx) {
549540
return ctx.Err()
550541
}
551542
start := i * partSize
552-
byteSize := stream.GetSize() - start
543+
byteSize := size - start
553544
if byteSize > partSize {
554545
byteSize = partSize
555546
}
@@ -577,7 +568,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
577568
"contentType": "application/octet-stream",
578569
"parallelUpload": false,
579570
"partInfos": firstPartInfos,
580-
"size": stream.GetSize(),
571+
"size": size,
581572
"parentFileId": dstDir.GetID(),
582573
"name": stream.GetName(),
583574
"type": "file",
@@ -630,7 +621,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
630621
}
631622

632623
// Progress
633-
p := driver.NewProgress(stream.GetSize(), up)
624+
p := driver.NewProgress(size, up)
634625

635626
rateLimited := driver.NewLimitedUploadStream(ctx, stream)
636627
// 上传所有分片
@@ -790,12 +781,14 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
790781
return fmt.Errorf("get file upload url failed with result code: %s, message: %s", resp.Data.Result.ResultCode, resp.Data.Result.ResultDesc)
791782
}
792783

784+
size := stream.GetSize()
793785
// Progress
794-
p := driver.NewProgress(stream.GetSize(), up)
795-
796-
var partSize = d.getPartSize(stream.GetSize())
797-
part := (stream.GetSize() + partSize - 1) / partSize
798-
if part == 0 {
786+
p := driver.NewProgress(size, up)
787+
var partSize = d.getPartSize(size)
788+
part := size / partSize
789+
if size%partSize > 0 {
790+
part++
791+
} else if part == 0 {
799792
part = 1
800793
}
801794
rateLimited := driver.NewLimitedUploadStream(ctx, stream)
@@ -805,10 +798,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
805798
}
806799

807800
start := i * partSize
808-
byteSize := stream.GetSize() - start
809-
if byteSize > partSize {
810-
byteSize = partSize
811-
}
801+
byteSize := min(size-start, partSize)
812802

813803
limitReader := io.LimitReader(rateLimited, byteSize)
814804
// Update Progress
@@ -820,7 +810,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
820810

821811
req = req.WithContext(ctx)
822812
req.Header.Set("Content-Type", "text/plain;name="+unicode(stream.GetName()))
823-
req.Header.Set("contentSize", strconv.FormatInt(stream.GetSize(), 10))
813+
req.Header.Set("contentSize", strconv.FormatInt(size, 10))
824814
req.Header.Set("range", fmt.Sprintf("bytes=%d-%d", start, start+byteSize-1))
825815
req.Header.Set("uploadtaskID", resp.Data.UploadResult.UploadTaskID)
826816
req.Header.Set("rangeType", "0")

drivers/139/util.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func (d *Yun139) refreshToken() error {
6767
if len(splits) < 3 {
6868
return fmt.Errorf("authorization is invalid, splits < 3")
6969
}
70+
d.Account = splits[1]
7071
strs := strings.Split(splits[2], "|")
7172
if len(strs) < 4 {
7273
return fmt.Errorf("authorization is invalid, strs < 4")

0 commit comments

Comments
 (0)