@@ -6,11 +6,9 @@ import (
66 "fmt"
77 "io"
88 "math/rand"
9- "mime"
109 "os"
1110 stdpath "path"
1211 "path/filepath"
13- "strconv"
1412 "strings"
1513 "time"
1614
@@ -21,30 +19,22 @@ import (
2119 "github.com/OpenListTeam/OpenList/v4/internal/op"
2220 "github.com/OpenListTeam/OpenList/v4/internal/stream"
2321 "github.com/OpenListTeam/OpenList/v4/internal/task"
22+ "github.com/OpenListTeam/OpenList/v4/internal/task_group"
23+ "github.com/OpenListTeam/OpenList/v4/pkg/utils"
24+ "github.com/OpenListTeam/OpenList/v4/server/common"
2425 "github.com/OpenListTeam/tache"
2526 "github.com/pkg/errors"
2627 log "github.com/sirupsen/logrus"
2728)
2829
2930type ArchiveDownloadTask struct {
30- task. TaskExtension
31+ TaskData
3132 model.ArchiveDecompressArgs
32- status string
33- SrcObjPath string
34- DstDirPath string
35- srcStorage driver.Driver
36- dstStorage driver.Driver
37- SrcStorageMp string
38- DstStorageMp string
3933}
4034
4135func (t * ArchiveDownloadTask ) GetName () string {
42- return fmt .Sprintf ("decompress [%s](%s)[%s] to [%s](%s) with password <%s>" , t .SrcStorageMp , t .SrcObjPath ,
43- t .InnerPath , t .DstStorageMp , t .DstDirPath , t .Password )
44- }
45-
46- func (t * ArchiveDownloadTask ) GetStatus () string {
47- return t .status
36+ return fmt .Sprintf ("decompress [%s](%s)[%s] to [%s](%s) with password <%s>" , t .SrcStorageMp , t .SrcActualPath ,
37+ t .InnerPath , t .DstStorageMp , t .DstActualPath , t .Password )
4838}
4939
5040func (t * ArchiveDownloadTask ) Run () error {
@@ -58,16 +48,21 @@ func (t *ArchiveDownloadTask) Run() error {
5848 if err != nil {
5949 return err
6050 }
51+ uploadTask .groupID = stdpath .Join (uploadTask .DstStorageMp , uploadTask .DstActualPath )
52+ task_group .TransferCoordinator .AddTask (uploadTask .groupID , nil )
6153 ArchiveContentUploadTaskManager .Add (uploadTask )
6254 return nil
6355}
6456
6557func (t * ArchiveDownloadTask ) RunWithoutPushUploadTask () (* ArchiveContentUploadTask , error ) {
6658 var err error
67- if t .srcStorage == nil {
68- t .srcStorage , err = op .GetStorageByMountPath (t .SrcStorageMp )
59+ if t .SrcStorage == nil {
60+ t .SrcStorage , err = op .GetStorageByMountPath (t .SrcStorageMp )
61+ if err != nil {
62+ return nil , err
63+ }
6964 }
70- srcObj , tool , ss , err := op .GetArchiveToolAndStream (t .Ctx (), t .srcStorage , t .SrcObjPath , model.LinkArgs {})
65+ srcObj , tool , ss , err := op .GetArchiveToolAndStream (t .Ctx (), t .SrcStorage , t .SrcActualPath , model.LinkArgs {})
7166 if err != nil {
7267 return nil , err
7368 }
@@ -87,7 +82,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
8782 total += s .GetSize ()
8883 }
8984 t .SetTotalBytes (total )
90- t .status = "getting src object"
85+ t .Status = "getting src object"
9186 for _ , s := range ss {
9287 if s .GetFile () == nil {
9388 _ , err = stream .CacheFullInTempFileAndWriter (s , func (p float64 ) {
@@ -104,7 +99,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
10499 } else {
105100 decompressUp = t .SetProgress
106101 }
107- t .status = "walking and decompressing"
102+ t .Status = "walking and decompressing"
108103 dir , err := os .MkdirTemp (conf .Conf .TempDir , "dir-*" )
109104 if err != nil {
110105 return nil , err
@@ -117,13 +112,14 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
117112 uploadTask := & ArchiveContentUploadTask {
118113 TaskExtension : task.TaskExtension {
119114 Creator : t .GetCreator (),
115+ ApiUrl : t .ApiUrl ,
120116 },
121- ObjName : baseName ,
122- InPlace : ! t .PutIntoNewDir ,
123- FilePath : dir ,
124- DstDirPath : t . DstDirPath ,
125- dstStorage : t . dstStorage ,
126- DstStorageMp : t .DstStorageMp ,
117+ ObjName : baseName ,
118+ InPlace : ! t .PutIntoNewDir ,
119+ FilePath : dir ,
120+ DstActualPath : t . DstActualPath ,
121+ dstStorage : t . DstStorage ,
122+ DstStorageMp : t .DstStorageMp ,
127123 }
128124 return uploadTask , nil
129125}
@@ -132,18 +128,19 @@ var ArchiveDownloadTaskManager *tache.Manager[*ArchiveDownloadTask]
132128
133129type ArchiveContentUploadTask struct {
134130 task.TaskExtension
135- status string
136- ObjName string
137- InPlace bool
138- FilePath string
139- DstDirPath string
140- dstStorage driver.Driver
141- DstStorageMp string
142- finalized bool
131+ status string
132+ ObjName string
133+ InPlace bool
134+ FilePath string
135+ DstActualPath string
136+ dstStorage driver.Driver
137+ DstStorageMp string
138+ finalized bool
139+ groupID string
143140}
144141
145142func (t * ArchiveContentUploadTask ) GetName () string {
146- return fmt .Sprintf ("upload %s to [%s](%s)" , t .ObjName , t .DstStorageMp , t .DstDirPath )
143+ return fmt .Sprintf ("upload %s to [%s](%s)" , t .ObjName , t .DstStorageMp , t .DstActualPath )
147144}
148145
149146func (t * ArchiveContentUploadTask ) GetStatus () string {
@@ -163,21 +160,42 @@ func (t *ArchiveContentUploadTask) Run() error {
163160 })
164161}
165162
166- func (t * ArchiveContentUploadTask ) RunWithNextTaskCallback (f func (nextTsk * ArchiveContentUploadTask ) error ) error {
163+ func (t * ArchiveContentUploadTask ) OnSucceeded () {
164+ task_group .TransferCoordinator .Done (t .groupID , true )
165+ }
166+
167+ func (t * ArchiveContentUploadTask ) OnFailed () {
168+ task_group .TransferCoordinator .Done (t .groupID , false )
169+ }
170+
171+ func (t * ArchiveContentUploadTask ) SetRetry (retry int , maxRetry int ) {
172+ t .TaskExtension .SetRetry (retry , maxRetry )
173+ if retry == 0 &&
174+ (len (t .groupID ) == 0 || // 重启恢复
175+ (t .GetErr () == nil && t .GetState () != tache .StatePending )) { // 手动重试
176+ t .groupID = stdpath .Join (t .DstStorageMp , t .DstActualPath )
177+ task_group .TransferCoordinator .AddTask (t .groupID , nil )
178+ }
179+ }
180+
181+ func (t * ArchiveContentUploadTask ) RunWithNextTaskCallback (f func (nextTask * ArchiveContentUploadTask ) error ) error {
167182 var err error
168183 if t .dstStorage == nil {
169184 t .dstStorage , err = op .GetStorageByMountPath (t .DstStorageMp )
185+ if err != nil {
186+ return err
187+ }
170188 }
171189 info , err := os .Stat (t .FilePath )
172190 if err != nil {
173191 return err
174192 }
175193 if info .IsDir () {
176194 t .status = "src object is dir, listing objs"
177- nextDstPath := t .DstDirPath
195+ nextDstActualPath := t .DstActualPath
178196 if ! t .InPlace {
179- nextDstPath = stdpath .Join (nextDstPath , t .ObjName )
180- err = op .MakeDir (t .Ctx (), t .dstStorage , nextDstPath )
197+ nextDstActualPath = stdpath .Join (nextDstActualPath , t .ObjName )
198+ err = op .MakeDir (t .Ctx (), t .dstStorage , nextDstActualPath )
181199 if err != nil {
182200 return err
183201 }
@@ -186,6 +204,9 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
186204 if err != nil {
187205 return err
188206 }
207+ if ! t .InPlace && len (t .groupID ) > 0 {
208+ task_group .TransferCoordinator .AppendPayload (t .groupID , task_group .DstPathToRefresh (nextDstActualPath ))
209+ }
189210 var es error
190211 for _ , entry := range entries {
191212 var nextFilePath string
@@ -198,16 +219,21 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
198219 es = stderrors .Join (es , err )
199220 continue
200221 }
222+ if len (t .groupID ) > 0 {
223+ task_group .TransferCoordinator .AddTask (t .groupID , nil )
224+ }
201225 err = f (& ArchiveContentUploadTask {
202226 TaskExtension : task.TaskExtension {
203227 Creator : t .GetCreator (),
228+ ApiUrl : t .ApiUrl ,
204229 },
205- ObjName : entry .Name (),
206- InPlace : false ,
207- FilePath : nextFilePath ,
208- DstDirPath : nextDstPath ,
209- dstStorage : t .dstStorage ,
210- DstStorageMp : t .DstStorageMp ,
230+ ObjName : entry .Name (),
231+ InPlace : false ,
232+ FilePath : nextFilePath ,
233+ DstActualPath : nextDstActualPath ,
234+ dstStorage : t .dstStorage ,
235+ DstStorageMp : t .DstStorageMp ,
236+ groupID : t .groupID ,
211237 })
212238 if err != nil {
213239 es = stderrors .Join (es , err )
@@ -228,13 +254,13 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
228254 Size : info .Size (),
229255 Modified : time .Now (),
230256 },
231- Mimetype : mime . TypeByExtension ( filepath .Ext (t .ObjName )),
257+ Mimetype : utils . GetMimeType ( stdpath .Ext (t .ObjName )),
232258 WebPutAsTask : true ,
233259 Reader : file ,
234260 }
235261 fs .Closers .Add (file )
236262 t .status = "uploading"
237- err = op .Put (t .Ctx (), t .dstStorage , t .DstDirPath , fs , t .SetProgress , true )
263+ err = op .Put (t .Ctx (), t .dstStorage , t .DstActualPath , fs , t .SetProgress , true )
238264 if err != nil {
239265 return err
240266 }
@@ -271,8 +297,9 @@ func moveToTempPath(path, prefix string) (string, error) {
271297
272298func genTempFileName (prefix string ) (string , error ) {
273299 retry := 0
300+ t := time .Now ().UnixMilli ()
274301 for retry < 10000 {
275- newPath := stdpath .Join (conf .Conf .TempDir , prefix + strconv . FormatUint ( uint64 ( rand .Uint32 ()), 10 ))
302+ newPath := filepath .Join (conf .Conf .TempDir , prefix + fmt . Sprintf ( "%x-%x" , t , rand .Uint32 ()))
276303 if _ , err := os .Stat (newPath ); err != nil {
277304 if os .IsNotExist (err ) {
278305 return newPath , nil
@@ -354,16 +381,19 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args
354381 }
355382 taskCreator , _ := ctx .Value (conf .UserKey ).(* model.User )
356383 tsk := & ArchiveDownloadTask {
357- TaskExtension : task.TaskExtension {
358- Creator : taskCreator ,
384+ TaskData : TaskData {
385+ TaskExtension : task.TaskExtension {
386+ Creator : taskCreator ,
387+ ApiUrl : common .GetApiUrl (ctx ),
388+ },
389+ SrcStorage : srcStorage ,
390+ DstStorage : dstStorage ,
391+ SrcActualPath : srcObjActualPath ,
392+ DstActualPath : dstDirActualPath ,
393+ SrcStorageMp : srcStorage .GetStorage ().MountPath ,
394+ DstStorageMp : dstStorage .GetStorage ().MountPath ,
359395 },
360396 ArchiveDecompressArgs : args ,
361- srcStorage : srcStorage ,
362- dstStorage : dstStorage ,
363- SrcObjPath : srcObjActualPath ,
364- DstDirPath : dstDirActualPath ,
365- SrcStorageMp : srcStorage .GetStorage ().MountPath ,
366- DstStorageMp : dstStorage .GetStorage ().MountPath ,
367397 }
368398 if ctx .Value (conf .NoTaskKey ) != nil {
369399 uploadTask , err := tsk .RunWithoutPushUploadTask ()
0 commit comments