Skip to content

Commit

Permalink
Create a log file even when there's nothing to do (#566)
Browse files Browse the repository at this point in the history
* Create a log file even when there's nothing to do

* Comments, strings, error enum, path separator

* Tell me what is going wrong

* Only test transfer count on final scheduling

* Check in both places for an error anyway

* Only send error on final part

* Single err var

* Path separator, special error for remove

* Fix test suite's result mocking for copyjobpartorder

* Cleanly exit sync if nothing is scheduled
  • Loading branch information
adreed-msft committed Sep 17, 2019
1 parent 24a8ec3 commit 3011f6d
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 36 deletions.
2 changes: 1 addition & 1 deletion cmd/copy.go
Expand Up @@ -1010,7 +1010,7 @@ func (cca *cookedCopyCmdArgs) processCopyJobPartOrders() (err error) {
// if blocking is specified to false, then another goroutine spawns and wait out the job
func (cca *cookedCopyCmdArgs) waitUntilJobCompletion(blocking bool) {
// print initial message to indicate that the job is starting
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), fmt.Sprintf("%s/%s.log", azcopyLogPathFolder, cca.jobID)))
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), fmt.Sprintf("%s%s%s.log", azcopyLogPathFolder, common.OS_PATH_SEPARATOR, cca.jobID)))

// initialize the times necessary to track progress
cca.jobStartTime = time.Now()
Expand Down
7 changes: 7 additions & 0 deletions cmd/copyEnumeratorHelper.go
Expand Up @@ -65,6 +65,13 @@ func dispatchFinalPart(e *common.CopyJobPartOrderRequest, cca *cookedCopyCmdArgs
Rpc(common.ERpcCmd.CopyJobPartOrder(), (*common.CopyJobPartOrderRequest)(e), &resp)

if !resp.JobStarted {
// Output the log location and such
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), fmt.Sprintf("%s%s%s.log", azcopyLogPathFolder, common.OS_PATH_SEPARATOR, cca.jobID)))

if resp.ErrorMsg == common.ECopyJobPartOrderErrorType.NoTransfersScheduledErr() {
return NothingScheduledError
}

return fmt.Errorf("copy job part order with JobId %s and part number %d failed because %s", e.JobID, e.PartNum, resp.ErrorMsg)
}

Expand Down
4 changes: 0 additions & 4 deletions cmd/copyEnumeratorInit.go
Expand Up @@ -217,10 +217,6 @@ func (cca *cookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
return addTransfer(&jobPartOrder, transfer, cca)
}
finalizer := func() error {
if len(jobPartOrder.Transfers) == 0 {
return errors.New("cannot find source to upload")
}

return dispatchFinalPart(&jobPartOrder, cca)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/jobsResume.go
Expand Up @@ -56,7 +56,7 @@ type resumeJobController struct {
// if blocking is specified to false, then another goroutine spawns and wait out the job
func (cca *resumeJobController) waitUntilJobCompletion(blocking bool) {
// print initial message to indicate that the job is starting
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), fmt.Sprintf("%s/%s.log", azcopyLogPathFolder, cca.jobID)))
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), fmt.Sprintf("%s%s%s.log", azcopyLogPathFolder, common.OS_PATH_SEPARATOR, cca.jobID)))

// initialize the times necessary to track progress
cca.jobStartTime = time.Now()
Expand Down
5 changes: 5 additions & 0 deletions cmd/removeEnumerator.go
Expand Up @@ -73,6 +73,11 @@ func newRemoveEnumerator(cca *cookedCopyCmdArgs) (enumerator *copyEnumerator, er
finalize := func() error {
jobInitiated, err := transferScheduler.dispatchFinalPart()
if err != nil {
if err == NothingScheduledError {
// No log file needed. Logging begins as a part of awaiting job completion.
return errors.New("nothing found to remove")
}

return err
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/sync.go
Expand Up @@ -257,7 +257,7 @@ func (cca *cookedSyncCmdArgs) scanningComplete() bool {
// if blocking is specified to false, then another goroutine spawns and wait out the job
func (cca *cookedSyncCmdArgs) waitUntilJobCompletion(blocking bool) {
// print initial message to indicate that the job is starting
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), fmt.Sprintf("%s/%s.log", azcopyLogPathFolder, cca.jobID)))
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), fmt.Sprintf("%s%s%s.log", azcopyLogPathFolder, common.OS_PATH_SEPARATOR, cca.jobID)))

// initialize the times necessary to track progress
cca.jobStartTime = time.Now()
Expand Down
6 changes: 4 additions & 2 deletions cmd/syncEnumerator.go
Expand Up @@ -112,7 +112,8 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
}

jobInitiated, err := transferScheduler.dispatchFinalPart()
if err != nil {
// sync cleanly exits if nothing is scheduled.
if err != nil && err != NothingScheduledError {
return err
}

Expand Down Expand Up @@ -151,7 +152,8 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
// let the deletions happen first
// otherwise if the final part is executed too quickly, we might quit before deletions could finish
jobInitiated, err := transferScheduler.dispatchFinalPart()
if err != nil {
// sync cleanly exits if nothing is scheduled.
if err != nil && err != NothingScheduledError {
return err
}

Expand Down
45 changes: 23 additions & 22 deletions cmd/zc_processor.go
Expand Up @@ -22,8 +22,11 @@ package cmd

import (
"fmt"
"github.com/Azure/azure-storage-azcopy/common"
"net/url"

"github.com/pkg/errors"

"github.com/Azure/azure-storage-azcopy/common"
)

type copyTransferProcessor struct {
Expand Down Expand Up @@ -58,9 +61,11 @@ func newCopyTransferProcessor(copyJobTemplate *common.CopyJobPartOrderRequest, n

func (s *copyTransferProcessor) scheduleCopyTransfer(storedObject storedObject) (err error) {
if len(s.copyJobTemplate.Transfers) == s.numOfTransfersPerPart {
err = s.sendPartToSte()
if err != nil {
return err
resp := s.sendPartToSte()

// TODO: If we ever do launch errors outside of the final "no transfers" error, make them output nicer things here.
if resp.ErrorMsg != "" {
return errors.New(string(resp.ErrorMsg))
}

// reset the transfers buffer
Expand Down Expand Up @@ -89,21 +94,20 @@ func (s *copyTransferProcessor) escapeIfNecessary(path string, shouldEscape bool
return path
}

func (s *copyTransferProcessor) dispatchFinalPart() (copyJobInitiated bool, err error) {
numberOfCopyTransfers := len(s.copyJobTemplate.Transfers)
var NothingScheduledError = errors.New("no transfers were scheduled because no files matched the specified criteria")

// if the number of transfer to copy is 0
// and no part was dispatched, then it means there is no work to do
if s.copyJobTemplate.PartNum == 0 && numberOfCopyTransfers == 0 {
return false, nil
}
func (s *copyTransferProcessor) dispatchFinalPart() (copyJobInitiated bool, err error) {
var resp common.CopyJobPartOrderResponse
s.copyJobTemplate.IsFinalPart = true
resp = s.sendPartToSte()

if numberOfCopyTransfers > 0 {
s.copyJobTemplate.IsFinalPart = true
err = s.sendPartToSte()
if err != nil {
return false, err
if !resp.JobStarted {
if resp.ErrorMsg == common.ECopyJobPartOrderErrorType.NoTransfersScheduledErr() {
return false, NothingScheduledError
}

return false, fmt.Errorf("copy job part order with JobId %s and part number %d failed because %s",
s.copyJobTemplate.JobID, s.copyJobTemplate.PartNum, resp.ErrorMsg)
}

if s.reportFinalPartDispatched != nil {
Expand All @@ -112,18 +116,15 @@ func (s *copyTransferProcessor) dispatchFinalPart() (copyJobInitiated bool, err
return true, nil
}

func (s *copyTransferProcessor) sendPartToSte() error {
// only test the response on the final dispatch to help diagnose root cause of test failures from 0 transfers
func (s *copyTransferProcessor) sendPartToSte() common.CopyJobPartOrderResponse {
var resp common.CopyJobPartOrderResponse
Rpc(common.ERpcCmd.CopyJobPartOrder(), s.copyJobTemplate, &resp)
if !resp.JobStarted {
return fmt.Errorf("copy job part order with JobId %s and part number %d failed to dispatch because %s",
s.copyJobTemplate.JobID, s.copyJobTemplate.PartNum, resp.ErrorMsg)
}

// if the current part order sent to ste is 0, then alert the progress reporting routine
if s.copyJobTemplate.PartNum == 0 && s.reportFirstPartDispatched != nil {
s.reportFirstPartDispatched()
}

return nil
return resp
}
8 changes: 6 additions & 2 deletions cmd/zt_interceptors_for_test.go
Expand Up @@ -42,7 +42,11 @@ func (i *interceptor) intercept(cmd common.RpcCmd, request interface{}, response
i.lastRequest = request

// mock the result
*(response.(*common.CopyJobPartOrderResponse)) = common.CopyJobPartOrderResponse{JobStarted: true}
if len(i.transfers) != 0 || !copyRequest.IsFinalPart {
*(response.(*common.CopyJobPartOrderResponse)) = common.CopyJobPartOrderResponse{JobStarted: true}
} else {
*(response.(*common.CopyJobPartOrderResponse)) = common.CopyJobPartOrderResponse{JobStarted: false, ErrorMsg: common.ECopyJobPartOrderErrorType.NoTransfersScheduledErr()}
}
case common.ERpcCmd.ListJobs():
case common.ERpcCmd.ListJobSummary():
case common.ERpcCmd.ListJobTransfers():
Expand Down Expand Up @@ -88,7 +92,7 @@ func (*mockedLifecycleManager) Prompt(message string, details common.PromptDetai
func (*mockedLifecycleManager) Exit(common.OutputBuilder, common.ExitCode) {}
func (*mockedLifecycleManager) Error(string) {}
func (*mockedLifecycleManager) SurrenderControl() {}
func (*mockedLifecycleManager) InitiateProgressReporting(common.WorkController) {}
func (*mockedLifecycleManager) InitiateProgressReporting(common.WorkController) {}
func (*mockedLifecycleManager) ClearEnvironmentVariable(env common.EnvironmentVariable) {
_ = os.Setenv(env.Name, "")
}
Expand Down
10 changes: 9 additions & 1 deletion common/rpc-models.go
Expand Up @@ -83,8 +83,16 @@ type S3CredentialInfo struct {
Region string
}

type CopyJobPartOrderErrorType string

var ECopyJobPartOrderErrorType CopyJobPartOrderErrorType

func (CopyJobPartOrderErrorType) NoTransfersScheduledErr() CopyJobPartOrderErrorType {
return CopyJobPartOrderErrorType("NoTransfersScheduledErr")
}

type CopyJobPartOrderResponse struct {
ErrorMsg string
ErrorMsg CopyJobPartOrderErrorType
JobStarted bool
}

Expand Down
3 changes: 2 additions & 1 deletion main_windows.go
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"os/exec"
"path"
"strings"
"syscall"

"github.com/Azure/azure-storage-azcopy/common"
Expand Down Expand Up @@ -58,7 +59,7 @@ func ProcessOSSpecificInitialization() (int, error) {
func GetAzCopyAppPath() string {
lcm := common.GetLifecycleMgr()
userProfile := lcm.GetEnvironmentVariable(common.EEnvironmentVariable.UserDir())
azcopyAppDataFolder := path.Join(userProfile, ".azcopy")
azcopyAppDataFolder := strings.ReplaceAll(path.Join(userProfile, ".azcopy"), "/", `\`)
if err := os.Mkdir(azcopyAppDataFolder, os.ModeDir); err != nil && !os.IsExist(err) {
return ""
}
Expand Down
5 changes: 5 additions & 0 deletions ste/init.go
Expand Up @@ -141,6 +141,11 @@ func ExecuteNewCopyJobPartOrder(order common.CopyJobPartOrderRequest) common.Cop
jppfn := JobsAdmin.NewJobPartPlanFileName(order.JobID, order.PartNum)
jppfn.Create(order) // Convert the order to a plan file
jpm := JobsAdmin.JobMgrEnsureExists(order.JobID, order.LogLevel, order.CommandString) // Get a this job part's job manager (create it if it doesn't exist)

if len(order.Transfers) == 0 && order.IsFinalPart {
jpm.Log(pipeline.LogError, "ERROR: No transfers were scheduled.")
return common.CopyJobPartOrderResponse{JobStarted: false, ErrorMsg: common.ECopyJobPartOrderErrorType.NoTransfersScheduledErr()}
}
// Get credential info from RPC request order, and set in InMemoryTransitJobState.
jpm.setInMemoryTransitJobState(
InMemoryTransitJobState{
Expand Down
2 changes: 1 addition & 1 deletion testSuite/scripts/utility.py
Expand Up @@ -525,7 +525,7 @@ def execute_azcopy_command(command):
universal_newlines=True)
except subprocess.CalledProcessError as exec:
# todo kill azcopy command in case of timeout
# print("command failed with error code " , exec.returncode , " and message " + exec.output)
print("command failed with error code " , exec.returncode , " and message " + exec.output)
return False
else:
return True
Expand Down

0 comments on commit 3011f6d

Please sign in to comment.