-
Notifications
You must be signed in to change notification settings - Fork 221
/
xfer-anyToRemote-file.go
606 lines (530 loc) · 25.9 KB
/
xfer-anyToRemote-file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste
import (
"context"
"crypto/md5"
"errors"
"fmt"
"github.com/Azure/azure-storage-blob-go/azblob"
"hash"
"net/http"
"net/url"
"runtime"
"strings"
"sync"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/common"
)
// This code for blob tier safety is _not_ safe for multiple jobs at once.
// That's alright, but it's good to know on the off chance.
// This sync.Once is present to ensure we output information about a S2S access tier preservation failure to stdout once
var tierNotAllowedFailure sync.Once
var checkLengthFailureOnReadOnlyDst sync.Once
// This sync.Once and string pair ensures that we only get a user's destination account kind once when handling set-tier
// Premium block blob doesn't support tiering, and page blobs only support P1-80.
// There are also size restrictions on tiering.
var destAccountSKU string
var destAccountKind string
var tierSetPossibleFail bool
var getDestAccountInfo sync.Once
var getDestAccountInfoError error
func prepareDestAccountInfo(bURL azblob.BlobURL, jptm IJobPartTransferMgr, ctx context.Context, mustGet bool) {
getDestAccountInfo.Do(func() {
infoResp, err := bURL.GetAccountInfo(ctx)
if err != nil {
// If GetAccountInfo fails, this transfer should fail because we lack at least one available permission
// UNLESS the user is using OAuth. In which case, the account owner can still get the info.
// If we fail to get the info under OAuth, don't fail the transfer.
// (https://docs.microsoft.com/en-us/rest/api/storageservices/get-account-information#authorization)
if mustGet {
getDestAccountInfoError = err
} else {
tierSetPossibleFail = true
glcm := common.GetLifecycleMgr()
glcm.Info("Transfers are likely to fail because destination does not support tiers.")
destAccountSKU = "failget"
destAccountKind = "failget"
}
} else {
destAccountSKU = string(infoResp.SkuName())
destAccountKind = string(infoResp.AccountKind())
}
})
if getDestAccountInfoError != nil {
jptm.FailActiveSendWithStatus("Checking destination tier availability (Set blob tier) ", getDestAccountInfoError, common.ETransferStatus.TierAvailabilityCheckFailure())
}
}
//// TODO: Infer availability based upon blob size as well, for premium page blobs.
func BlobTierAllowed(destTier azblob.AccessTierType) bool {
// If we failed to get the account info, just return true.
// This is because we can't infer whether it's possible or not, and the setTier operation could possibly succeed (or fail)
if tierSetPossibleFail {
return true
}
// If the account is premium, Storage/StorageV2 only supports page blobs (Tiers P1-80). Block blob does not support tiering whatsoever.
if strings.Contains(destAccountSKU, "Premium") {
// storage V1/V2
if destAccountKind == "StorageV2" {
// P1-80 possible.
return premiumPageBlobTierRegex.MatchString(string(destTier))
}
if destAccountKind == "Storage" {
// No tier setting is allowed.
return false
}
if strings.Contains(destAccountKind, "Block") {
// No tier setting is allowed.
return false
}
// Any other storage type would have to be file storage, and we can't set tier there.
panic("Cannot set tier on azure files.")
} else {
if destAccountKind == "Storage" { // Tier setting not allowed on classic accounts
return false
}
// Standard storage account. If it's Hot, Cool, or Archive, we're A-OK.
// Page blobs, however, don't have an access tier on Standard accounts.
// However, this is also OK, because the pageblob sender code prevents us from using a standard access tier type.
return destTier == azblob.AccessTierArchive || destTier == azblob.AccessTierCool || destTier == azblob.AccessTierHot
}
}
func ValidateTier(jptm IJobPartTransferMgr, blobTier azblob.AccessTierType, blobURL azblob.BlobURL, ctx context.Context, performQuietly bool) (isValid bool) {
if jptm.IsLive() && blobTier != azblob.AccessTierNone {
// Let's check if we can confirm we'll be able to check the destination blob's account info.
// A SAS token, even with write-only permissions is enough. OR, OAuth with the account owner.
// We can't guess that last information, so we'll take a gamble and try to get account info anyway.
destParts := azblob.NewBlobURLParts(blobURL.URL())
mustGet := destParts.SAS.Encode() != ""
prepareDestAccountInfo(blobURL, jptm, ctx, mustGet)
tierAvailable := BlobTierAllowed(blobTier)
if tierAvailable {
return true
} else if !performQuietly {
tierNotAllowedFailure.Do(func() {
glcm := common.GetLifecycleMgr()
glcm.Info("Destination could not accommodate the tier " + string(blobTier) + ". Going ahead with the default tier. In case of service to service transfer, consider setting the flag --s2s-preserve-access-tier=false.")
})
}
return false
} else {
return false
}
}
// xfer.go requires just a single xfer function for the whole job.
// This routine serves that role for uploads and S2S copies, and redirects for each transfer to a file or folder implementation
func anyToRemote(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer, senderFactory senderFactory, sipf sourceInfoProviderFactory) {
info := jptm.Info()
fromTo := jptm.FromTo()
// Ensure that the transfer isn't the same item, and fail it if it is.
// This scenario can only happen with S2S. We'll parse the URLs and compare the host and path.
if fromTo.IsS2S() {
srcURL, err := url.Parse(info.Source)
common.PanicIfErr(err)
dstURL, err := url.Parse(info.Destination)
common.PanicIfErr(err)
if srcURL.Hostname() == dstURL.Hostname() &&
srcURL.EscapedPath() == dstURL.EscapedPath() {
// src and dst point to the same object
// if src does not have snapshot/versionId, then error out as we cannot copy into the object itself
// if dst has snapshot or versionId specified, do not error and let the service fail the request with clear message
srcRQ := srcURL.Query()
if len(srcRQ["sharesnapshot"]) == 0 && len(srcRQ["snapshot"]) == 0 && len(srcRQ["versionid"]) == 0 {
jptm.LogSendError(info.Source, info.Destination, "Transfer source and destination are the same, which would cause data loss. Aborting transfer.", 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
}
}
if info.IsFolderPropertiesTransfer() {
anyToRemote_folder(jptm, info, p, pacer, senderFactory, sipf)
} else {
anyToRemote_file(jptm, info, p, pacer, senderFactory, sipf)
}
}
// anyToRemote_file handles all kinds of sender operations for files - both uploads from local files, and S2S copies
func anyToRemote_file(jptm IJobPartTransferMgr, info TransferInfo, p pipeline.Pipeline, pacer pacer, senderFactory senderFactory, sipf sourceInfoProviderFactory) {
pseudoId := common.NewPseudoChunkIDForWholeFile(info.Source)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.XferStart())
defer jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone())
srcSize := info.SourceSize
// step 1. perform initial checks
if jptm.WasCanceled() {
/* This is the earliest we detect jptm has been cancelled before scheduling chunks */
jptm.SetStatus(common.ETransferStatus.Cancelled())
jptm.ReportTransferDone()
return
}
// step 2a. Create sender
srcInfoProvider, err := sipf(jptm)
if err != nil {
jptm.LogSendError(info.Source, info.Destination, err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
if srcInfoProvider.EntityType() != common.EEntityType.File() {
panic("configuration error. Source Info Provider does not have File entity type")
}
s, err := senderFactory(jptm, info.Destination, p, pacer, srcInfoProvider)
if err != nil {
jptm.LogSendError(info.Source, info.Destination, err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
// step 2b. Read chunk size and count from the sender (since it may have applied its own defaults and/or calculations to produce these values
numChunks := s.NumChunks()
if jptm.ShouldLog(pipeline.LogInfo) {
jptm.LogTransferStart(info.Source, info.Destination, fmt.Sprintf("Specified chunk size %d", s.ChunkSize()))
}
if s.NumChunks() == 0 {
panic("must always schedule one chunk, even if file is empty") // this keeps our code structure simpler, by using a dummy chunk for empty files
}
// step 3: check overwrite option
// if the force Write flags is set to false or prompt
// then check the file exists at the remote location
// if it does, react accordingly
if jptm.GetOverwriteOption() != common.EOverwriteOption.True() {
exists, dstLmt, existenceErr := s.RemoteFileExists()
if existenceErr != nil {
jptm.LogSendError(info.Source, info.Destination, "Could not check destination file existence. "+existenceErr.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed()) // is a real failure, not just a SkippedFileAlreadyExists, in this case
jptm.ReportTransferDone()
return
}
if exists {
shouldOverwrite := false
// if necessary, prompt to confirm user's intent
if jptm.GetOverwriteOption() == common.EOverwriteOption.Prompt() {
// remove the SAS before prompting the user
parsed, _ := url.Parse(info.Destination)
parsed.RawQuery = ""
shouldOverwrite = jptm.GetOverwritePrompter().ShouldOverwrite(parsed.String(), common.EEntityType.File())
} else if jptm.GetOverwriteOption() == common.EOverwriteOption.IfSourceNewer() {
// only overwrite if source lmt is newer (after) the destination
if jptm.LastModifiedTime().After(dstLmt) {
shouldOverwrite = true
}
}
if !shouldOverwrite {
// logging as Warning so that it turns up even in compact logs, and because previously we use Error here
jptm.LogAtLevelForCurrentTransfer(pipeline.LogWarning, "File already exists, so will be skipped")
jptm.SetStatus(common.ETransferStatus.SkippedEntityAlreadyExists())
jptm.ReportTransferDone()
return
}
}
}
// step 4: Open the local Source File (if any)
common.GetLifecycleMgr().E2EAwaitAllowOpenFiles()
jptm.LogChunkStatus(pseudoId, common.EWaitReason.OpenLocalSource())
var sourceFileFactory func() (common.CloseableReaderAt, error)
srcFile := (common.CloseableReaderAt)(nil)
if srcInfoProvider.IsLocal() {
sourceFileFactory = srcInfoProvider.(ILocalSourceInfoProvider).OpenSourceFile // all local providers must implement this interface
srcFile, err = sourceFileFactory()
if err != nil {
suffix := ""
if strings.Contains(err.Error(), "Access is denied") && runtime.GOOS == "windows" {
suffix = " See --" + common.BackupModeFlagName + " flag if you need to read all files regardless of their permissions"
}
jptm.LogSendError(info.Source, info.Destination, "Couldn't open source. "+err.Error()+suffix, 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
defer srcFile.Close() // we read all the chunks in this routine, so can close the file at the end
}
// We always to LMT verification after the transfer. Also do it here, before transfer, when:
// 1) Source is local, and source's size is > 1 chunk. (why not always? Since getting LMT is not "free" at very small sizes)
// 2) Source is remote, i.e. S2S copy case. And source's size is larger than one chunk. So verification can possibly save transfer's cost.
jptm.LogChunkStatus(pseudoId, common.EWaitReason.ModifiedTimeRefresh())
if _, isS2SCopier := s.(s2sCopier); numChunks > 1 &&
(srcInfoProvider.IsLocal() || isS2SCopier && info.S2SSourceChangeValidation) {
lmt, err := srcInfoProvider.GetFreshFileLastModifiedTime()
if err != nil {
jptm.LogSendError(info.Source, info.Destination, "Couldn't get source's last modified time-"+err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
if !lmt.Equal(jptm.LastModifiedTime()) {
jptm.LogSendError(info.Source, info.Destination, "File modified since transfer scheduled", 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
}
// step 5a: lock the destination
// (is safe to do it relatively early here, before we run the prologue, because its just a internal lock, within the app)
// But must be after all of the early returns that are above here (since
// if we succeed here, we need to know the epilogue will definitely run to unlock later)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.LockDestination())
err = jptm.WaitUntilLockDestination(jptm.Context())
if err != nil {
jptm.LogSendError(info.Source, info.Destination, err.Error(), 0)
jptm.SetStatus(common.ETransferStatus.Failed())
jptm.ReportTransferDone()
return
}
// *****
// Error-handling rules change here.
// ABOVE this point, we end the transfer using the code as shown above
// BELOW this point, this routine always schedules the expected number
// of chunks, even if it has seen a failure, and the
// workers (the chunkfunc implementations) must use
// jptm.FailActiveSend when there's an error)
// TODO: are we comfortable with this approach?
// DECISION: 16 Jan, 2019: for now, we are leaving in place the above rule than number of of completed chunks must
// eventually reach numChunks, since we have no better short-term alternative.
// ******
// step 5b: tell jptm what to expect, and how to clean up at the end
jptm.SetNumberOfChunks(numChunks)
jptm.SetActionAfterLastChunk(func() { epilogueWithCleanupSendToRemote(jptm, s, srcInfoProvider) })
// stop tracking pseudo id (since real chunk id's will be tracked from here on)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone())
// Step 6: Go through the file and schedule chunk messages to send each chunk
scheduleSendChunks(jptm, info.Source, srcFile, srcSize, s, sourceFileFactory, srcInfoProvider)
}
var jobCancelledLocalPrefetchErr = errors.New("job was cancelled; Pre-fetching stopped")
// Schedule all the send chunks.
// For upload, we force preload of each chunk to memory, and we wait (block)
// here if the amount of preloaded data gets excessive. That's OK to do,
// because if we already have that much data preloaded (and scheduled for sending in
// chunks) then we don't need to schedule any more chunks right now, so the blocking
// is harmless (and a good thing, to avoid excessive RAM usage).
// To take advantage of the good sequential read performance provided by many file systems,
// and to be able to compute an MD5 hash for the file, we work sequentially through the file here.
func scheduleSendChunks(jptm IJobPartTransferMgr, srcPath string, srcFile common.CloseableReaderAt, srcSize int64, s sender, sourceFileFactory common.ChunkReaderSourceFactory, srcInfoProvider ISourceInfoProvider) {
// For generic send
chunkSize := s.ChunkSize()
numChunks := s.NumChunks()
// For upload
var md5Channel chan<- []byte
var prefetchErr error
var chunkReader common.SingleChunkReader
ps := common.PrologueState{}
var md5Hasher hash.Hash
if jptm.ShouldPutMd5() {
md5Hasher = md5.New()
} else {
md5Hasher = common.NewNullHasher()
}
safeToUseHash := true
if srcInfoProvider.IsLocal() {
md5Channel = s.(uploader).Md5Channel()
defer close(md5Channel)
}
chunkIDCount := int32(0)
for startIndex := int64(0); startIndex < srcSize || isDummyChunkInEmptyFile(startIndex, srcSize); startIndex += int64(chunkSize) {
adjustedChunkSize := int64(chunkSize)
// compute actual size of the chunk
if startIndex+int64(chunkSize) > srcSize {
adjustedChunkSize = srcSize - startIndex
}
id := common.NewChunkID(srcPath, startIndex, adjustedChunkSize) // TODO: stop using adjustedChunkSize, below, and use the size that's in the ID
if srcInfoProvider.IsLocal() {
if jptm.WasCanceled() {
prefetchErr = jobCancelledLocalPrefetchErr
} else {
// As long as the prefetch error is nil, we'll attempt a prefetch.
// Otherwise, the chunk reader didn't need to be made.
// It's a waste of time to prefetch here, too, if we already know we can't upload.
// Furthermore, this prevents prefetchErr changing from under us.
if prefetchErr == nil {
// create reader and prefetch the data into it
chunkReader = createPopulatedChunkReader(jptm, sourceFileFactory, id, adjustedChunkSize, srcFile)
// Wait until we have enough RAM, and when we do, prefetch the data for this chunk.
prefetchErr = chunkReader.BlockingPrefetch(srcFile, false)
if prefetchErr == nil {
// *** NOTE: the hasher hashes the buffer as it is right now. IF the chunk upload fails, then
// the chunkReader will repeat the read from disk. So there is an essential dependency
// between the hashing and our change detection logic.
common.DocumentationForDependencyOnChangeDetection() // <-- read the documentation here ***
chunkReader.WriteBufferTo(md5Hasher)
ps = chunkReader.GetPrologueState()
} else {
safeToUseHash = false // because we've missed a chunk
}
}
}
}
// If this is the the very first chunk, do special init steps
if startIndex == 0 {
// Run prologue before first chunk is scheduled.
// If file is not local, we'll get no leading bytes, but we still run the prologue in case
// there's other initialization to do in the sender.
modified := s.Prologue(ps)
if modified {
jptm.SetDestinationIsModified()
}
}
// schedule the chunk job/msg
jptm.LogChunkStatus(id, common.EWaitReason.WorkerGR())
isWholeFile := numChunks == 1
var cf chunkFunc
if srcInfoProvider.IsLocal() {
if prefetchErr == nil {
cf = s.(uploader).GenerateUploadFunc(id, chunkIDCount, chunkReader, isWholeFile)
} else {
if chunkReader != nil {
_ = chunkReader.Close()
}
// Our jptm logic currently requires us to schedule every chunk, even if we know there's an error,
// so we schedule a func that will just fail with the given error
cf = createSendToRemoteChunkFunc(jptm, id, func() { jptm.FailActiveSend("chunk data read", prefetchErr) })
}
} else {
cf = s.(s2sCopier).GenerateCopyFunc(id, chunkIDCount, adjustedChunkSize, isWholeFile)
}
jptm.ScheduleChunks(cf)
chunkIDCount++
}
// sanity check to verify the number of chunks scheduled
if chunkIDCount != int32(numChunks) {
panic(fmt.Errorf("difference in the number of chunk calculated %v and actual chunks scheduled %v for src %s of size %v", numChunks, chunkIDCount, srcPath, srcSize))
}
if srcInfoProvider.IsLocal() && safeToUseHash {
md5Channel <- md5Hasher.Sum(nil)
}
}
// Make reader for this chunk.
// Each chunk reader also gets a factory to make a reader for the file, in case it needs to repeat its part
// of the file read later (when doing a retry)
// BTW, the reader we create here just works with a single chuck. (That's in contrast with downloads, where we have
// to use an object that encompasses the whole file, so that it can put the chunks back into order. We don't have that requirement here.)
func createPopulatedChunkReader(jptm IJobPartTransferMgr, sourceFileFactory common.ChunkReaderSourceFactory, id common.ChunkID, adjustedChunkSize int64, srcFile common.CloseableReaderAt) common.SingleChunkReader {
chunkReader := common.NewSingleChunkReader(jptm.Context(),
sourceFileFactory,
id,
adjustedChunkSize,
jptm.ChunkStatusLogger(),
jptm,
jptm.SlicePool(),
jptm.CacheLimiter())
return chunkReader
}
func isDummyChunkInEmptyFile(startIndex int64, fileSize int64) bool {
return startIndex == 0 && fileSize == 0
}
// Complete epilogue. Handles both success and failure.
func epilogueWithCleanupSendToRemote(jptm IJobPartTransferMgr, s sender, sip ISourceInfoProvider) {
info := jptm.Info()
// allow our usual state tracking mechanism to keep count of how many epilogues are running at any given instant, for perf diagnostics
pseudoId := common.NewPseudoChunkIDForWholeFile(info.Source)
jptm.LogChunkStatus(pseudoId, common.EWaitReason.Epilogue())
defer jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone()) // normal setting to done doesn't apply to these pseudo ids
if jptm.WasCanceled() {
// This is where we detect that transfer has been cancelled. Further statments do not act on
// dead jptm. We set the status here.
jptm.SetStatus(common.ETransferStatus.Cancelled())
}
if jptm.IsLive() {
if _, isS2SCopier := s.(s2sCopier); sip.IsLocal() || (isS2SCopier && info.S2SSourceChangeValidation) {
// Check the source to see if it was changed during transfer. If it was, mark the transfer as failed.
lmt, err := sip.GetFreshFileLastModifiedTime()
if err != nil {
jptm.FailActiveSend("epilogueWithCleanupSendToRemote", err)
}
if !lmt.Equal(jptm.LastModifiedTime()) {
// **** Note that this check is ESSENTIAL and not just for the obvious reason of not wanting to upload
// corrupt or inconsistent data. It's also essential to the integrity of our MD5 hashes.
common.DocumentationForDependencyOnChangeDetection() // <-- read the documentation here ***
jptm.FailActiveSend("epilogueWithCleanupSendToRemote", errors.New("source modified during transfer"))
}
}
}
// TODO: should we refactor to force this to accept jptm isLive as a parameter, to encourage it to be checked?
// or should we redefine epilogue to be success-path only, and only call it in that case?
s.Epilogue() // Perform service-specific cleanup before jptm cleanup. Some services may actually require setup to make the file actually appear.
if jptm.IsLive() && info.DestLengthValidation {
_, isS2SCopier := s.(s2sCopier)
shouldCheckLength := true
destLength, err := s.GetDestinationLength()
if resp, respOk := err.(pipeline.Response); respOk && resp.Response() != nil &&
resp.Response().StatusCode == http.StatusForbidden {
// The destination is write-only. Cannot verify length
shouldCheckLength = false
checkLengthFailureOnReadOnlyDst.Do(func() {
var glcm = common.GetLifecycleMgr()
msg := fmt.Sprintf("Could not read destination length. If the destination is write-only, use --check-length=false on the command line.")
glcm.Info(msg)
if jptm.ShouldLog(pipeline.LogError) {
jptm.Log(pipeline.LogError, msg)
}
})
}
if shouldCheckLength {
if err != nil {
wrapped := fmt.Errorf("Could not read destination length. %w", err)
jptm.FailActiveSend(common.IffString(isS2SCopier, "S2S ", "Upload ")+"Length check: Get destination length", wrapped)
} else if destLength != jptm.Info().SourceSize {
jptm.FailActiveSend(common.IffString(isS2SCopier, "S2S ", "Upload ")+"Length check", errors.New("destination length does not match source length"))
}
}
}
if jptm.HoldsDestinationLock() { // TODO consider add test of jptm.IsDeadInflight here, so we can remove that from inside all the cleanup methods
s.Cleanup() // Perform jptm cleanup, if THIS jptm has the lock on the destination
}
commonSenderCompletion(jptm, s, info)
}
// commonSenderCompletion is used for both files and folders
func commonSenderCompletion(jptm IJobPartTransferMgr, s sender, info TransferInfo) {
jptm.EnsureDestinationUnlocked()
if jptm.TransferStatusIgnoringCancellation() == 0 {
panic("think we're finished but status is notStarted")
}
// note that we do not really know whether the context was canceled because of an error, or because the user asked for it
// if was an intentional cancel, the status is still "in progress", so we are still counting it as pending
// we leave these transfer status alone
// in case of errors, the status was already set, so we don't need to do anything here either
//
// it is entirely possible that all the chunks were finished, but then by the time we get to this line
// the context is canceled. In this case, a completely transferred file would not be marked "completed".
// it's definitely a case that we should be aware of, but given how rare it is, and how low the impact (the user can just resume), we don't have to do anything more to it atm.
if jptm.IsLive() {
// We know all chunks are done (because this routine was called)
// and we know the transfer didn't fail (because just checked its status above and made sure the context was not canceled),
// so it must have succeeded. So make sure its not left "in progress" state
jptm.SetStatus(common.ETransferStatus.Success())
// Final logging
if jptm.ShouldLog(pipeline.LogInfo) { // TODO: question: can we remove these ShouldLogs? Aren't they inside Log?
if _, ok := s.(s2sCopier); ok {
jptm.Log(pipeline.LogInfo, fmt.Sprintf("COPYSUCCESSFUL: %s%s", info.entityTypeLogIndicator(), strings.Split(info.Destination, "?")[0]))
} else if _, ok := s.(uploader); ok {
// Output relative path of file, includes file name.
jptm.Log(pipeline.LogInfo, fmt.Sprintf("UPLOADSUCCESSFUL: %s%s", info.entityTypeLogIndicator(), strings.Split(info.Destination, "?")[0]))
} else {
panic("invalid state: epilogueWithCleanupSendToRemote should be used by COPY and UPLOAD")
}
}
if jptm.ShouldLog(pipeline.LogDebug) {
jptm.Log(pipeline.LogDebug, "Finalizing Transfer")
}
} else {
if jptm.ShouldLog(pipeline.LogDebug) {
jptm.Log(pipeline.LogDebug, "Finalizing Transfer Cancellation/Failure")
}
}
// successful or unsuccessful, it's definitely over
jptm.ReportTransferDone()
}