From 75db5cbd1a9d6734aeb965c98aa740cedc358ae8 Mon Sep 17 00:00:00 2001 From: Amogh Rathore Date: Wed, 25 Oct 2023 15:47:20 -0700 Subject: [PATCH] Load Managed Daemon images in background (#3984) --- agent/app/agent.go | 49 +++- agent/app/agent_capability.go | 14 +- agent/app/agent_test.go | 47 ++++ agent/app/agent_unix.go | 9 +- agent/app/agent_windows.go | 8 +- agent/ebs/watcher.go | 132 +++++++--- agent/ebs/watcher_test.go | 228 ++++++++++++++++-- agent/engine/daemonmanager/daemon_manager.go | 3 + .../daemonmanager/daemon_manager_linux.go | 5 + .../daemonmanager/daemon_manager_windows.go | 4 + agent/engine/daemonmanager/mock/manager.go | 15 ++ agent/engine/docker_image_manager.go | 2 + agent/utils/utils.go | 14 ++ agent/utils/utils_test.go | 17 ++ .../ecs-agent/logger/field/constants.go | 3 + ecs-agent/logger/field/constants.go | 3 + 16 files changed, 479 insertions(+), 74 deletions(-) diff --git a/agent/app/agent.go b/agent/app/agent.go index f73afd07dc..e3365b726f 100644 --- a/agent/app/agent.go +++ b/agent/app/agent.go @@ -433,19 +433,15 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre return exitcodes.ExitTerminal } + // Load Managed Daemon images asynchronously + agent.loadManagedDaemonImagesAsync(imageManager) + scManager := agent.serviceconnectManager scManager.SetECSClient(client, agent.containerInstanceARN) if loaded, _ := scManager.IsLoaded(agent.dockerClient); loaded { imageManager.AddImageToCleanUpExclusionList(agent.serviceconnectManager.GetLoadedImageName()) } - // exclude all daemon images from cleanup - for _, csiDM := range agent.daemonManagers { - if loaded, _ := csiDM.IsLoaded(agent.dockerClient); loaded { - imageManager.AddImageToCleanUpExclusionList(csiDM.GetManagedDaemon().GetLoadedDaemonImageRef()) - } - } - // Add container instance ARN to metadata manager if agent.cfg.ContainerMetadataEnabled.Enabled() { agent.metadataManager.SetContainerInstanceARN(agent.containerInstanceARN) @@ -485,7 +481,7 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre agent.startAsyncRoutines(containerChangeEventStream, credentialsManager, imageManager, taskEngine, deregisterInstanceEventStream, client, taskHandler, attachmentEventHandler, state, doctor) // TODO add EBS watcher to async routines - agent.startEBSWatcher(state, taskEngine) + agent.startEBSWatcher(state, taskEngine, agent.dockerClient) // Start the acs session, which should block doStart return agent.startACSSession(credentialsManager, taskEngine, deregisterInstanceEventStream, client, state, taskHandler, doctor) @@ -751,6 +747,38 @@ func (agent *ecsAgent) constructVPCSubnetAttributes() []*ecs.Attribute { } } +// Loads Managed Daemon images for all Managed Daemons registered on the Agent. +// The images are loaded in the background. Successfully loaded images are added to +// imageManager's cleanup exclusion list. +func (agent *ecsAgent) loadManagedDaemonImagesAsync(imageManager engine.ImageManager) { + daemonManagers := agent.getDaemonManagers() + logger.Debug(fmt.Sprintf("Will load images for %d Managed Daemons", len(daemonManagers))) + for _, daemonManager := range daemonManagers { + go agent.loadManagedDaemonImage(daemonManager, imageManager) + } +} + +// Loads Managed Daemon image and adds it to image cleanup exclusion list upon success. +func (agent *ecsAgent) loadManagedDaemonImage(dm dm.DaemonManager, imageManager engine.ImageManager) { + imageRef := dm.GetManagedDaemon().GetImageRef() + logger.Info("Starting to load Managed Daemon image", logger.Fields{ + field.ImageRef: imageRef, + }) + image, err := dm.LoadImage(agent.ctx, agent.dockerClient) + if err != nil { + logger.Error("Failed to load Managed Daemon image", logger.Fields{ + field.ImageRef: imageRef, + field.Error: err, + }) + return + } + logger.Info("Successfully loaded Managed Daemon image", logger.Fields{ + field.ImageRef: imageRef, + field.ImageID: image.ID, + }) + imageManager.AddImageToCleanUpExclusionList(imageRef) +} + // registerContainerInstance registers the container instance ID for the ECS Agent func (agent *ecsAgent) registerContainerInstance( client api.ECSClient, @@ -1140,6 +1168,11 @@ func (agent *ecsAgent) setDaemonManager(key string, val dm.DaemonManager) { agent.daemonManagers[key] = val } +// Returns daemon managers map. Not designed to be thread-safe. +func (agent *ecsAgent) getDaemonManagers() map[string]dm.DaemonManager { + return agent.daemonManagers +} + // setVPCSubnet sets the vpc and subnet ids for the agent by querying the // instance metadata service func (agent *ecsAgent) setVPCSubnet() (error, bool) { diff --git a/agent/app/agent_capability.go b/agent/app/agent_capability.go index 0c812ba41e..6742960941 100644 --- a/agent/app/agent_capability.go +++ b/agent/app/agent_capability.go @@ -522,12 +522,16 @@ func (agent *ecsAgent) appendEBSTaskAttachCapabilities(capabilities []*ecs.Attri if daemonDef.GetImageName() == md.EbsCsiDriver { csiDaemonManager := dm.NewDaemonManager(daemonDef) agent.setDaemonManager(md.EbsCsiDriver, csiDaemonManager) - if _, err := csiDaemonManager.LoadImage(agent.ctx, agent.dockerClient); err != nil { - logger.Error("Failed to load the EBS CSI Driver. This container instance will not be able to support EBS Task Attach", + imageExists, err := csiDaemonManager.ImageExists() + if !imageExists { + logger.Error( + "Either EBS Daemon image does not exist or failed to check its existence."+ + " This container instance will not advertise EBS Task Attach capability.", logger.Fields{ - field.Error: err, - }, - ) + field.ImageName: csiDaemonManager.GetManagedDaemon().GetImageName(), + field.ImageTARPath: csiDaemonManager.GetManagedDaemon().GetImageTarPath(), + field.Error: err, + }) return capabilities } } diff --git a/agent/app/agent_test.go b/agent/app/agent_test.go index 5a3f9328ad..e3346b27f6 100644 --- a/agent/app/agent_test.go +++ b/agent/app/agent_test.go @@ -59,6 +59,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" aws_credentials "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/docker/docker/api/types" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1859,3 +1860,49 @@ func TestWaitUntilInstanceInServicePolling(t *testing.T) { }) } } + +func TestLoadManagedDaemonImage(t *testing.T) { + tcs := []struct { + name string + setDaemonManagerExpectations func(*mock_daemonmanager.MockDaemonManager) + setImageManagerExpectations func(*mock_engine.MockImageManager) + }{ + { + name: "no exclusion list update if image load fails", + setDaemonManagerExpectations: func(mdm *mock_daemonmanager.MockDaemonManager) { + mdm.EXPECT().GetManagedDaemon().Return(md.NewManagedDaemon("name", "tag")).Times(2) + mdm.EXPECT().LoadImage(gomock.Any(), gomock.Any()).Return(nil, errors.New("error")) + }, + }, + { + name: "exclusion list is updated if image load succeeds", + setDaemonManagerExpectations: func(mdm *mock_daemonmanager.MockDaemonManager) { + mdm.EXPECT().GetManagedDaemon().Return(md.NewManagedDaemon("name", "tag")).Times(3) + mdm.EXPECT(). + LoadImage(gomock.Any(), gomock.Any()). + Return(&types.ImageInspect{ID: "image-id"}, nil) + }, + setImageManagerExpectations: func(mim *mock_engine.MockImageManager) { + mim.EXPECT().AddImageToCleanUpExclusionList("name:tag") + }, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + ctrl.Finish() + + dockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + daemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl) + imageManager := mock_engine.NewMockImageManager(ctrl) + + tc.setDaemonManagerExpectations(daemonManager) + if tc.setImageManagerExpectations != nil { + tc.setImageManagerExpectations(imageManager) + } + + agent := &ecsAgent{ctx: context.Background(), dockerClient: dockerClient} + agent.loadManagedDaemonImage(daemonManager, imageManager) + }) + } +} diff --git a/agent/app/agent_unix.go b/agent/app/agent_unix.go index b3c1429289..a653eb9836 100644 --- a/agent/app/agent_unix.go +++ b/agent/app/agent_unix.go @@ -21,6 +21,7 @@ import ( asmfactory "github.com/aws/amazon-ecs-agent/agent/asm/factory" "github.com/aws/amazon-ecs-agent/agent/config" + "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" ebs "github.com/aws/amazon-ecs-agent/agent/ebs" "github.com/aws/amazon-ecs-agent/agent/ecscni" "github.com/aws/amazon-ecs-agent/agent/engine" @@ -151,10 +152,14 @@ func (agent *ecsAgent) startENIWatcher(state dockerstate.TaskEngineState, stateC return nil } -func (agent *ecsAgent) startEBSWatcher(state dockerstate.TaskEngineState, taskEngine engine.TaskEngine) { +func (agent *ecsAgent) startEBSWatcher( + state dockerstate.TaskEngineState, + taskEngine engine.TaskEngine, + dockerClient dockerapi.DockerClient, +) { if agent.ebsWatcher == nil { seelog.Debug("Creating new EBS watcher...") - agent.ebsWatcher = ebs.NewWatcher(agent.ctx, state, taskEngine) + agent.ebsWatcher = ebs.NewWatcher(agent.ctx, state, taskEngine, dockerClient) go agent.ebsWatcher.Start() } } diff --git a/agent/app/agent_windows.go b/agent/app/agent_windows.go index a7a87f8195..8e263bfbf1 100644 --- a/agent/app/agent_windows.go +++ b/agent/app/agent_windows.go @@ -24,6 +24,7 @@ import ( asmfactory "github.com/aws/amazon-ecs-agent/agent/asm/factory" "github.com/aws/amazon-ecs-agent/agent/data" + "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" "github.com/aws/amazon-ecs-agent/agent/ecscni" "github.com/aws/amazon-ecs-agent/agent/engine" "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" @@ -99,9 +100,12 @@ func (agent *ecsAgent) startWindowsService() int { return 0 } -func (agent *ecsAgent) startEBSWatcher(state dockerstate.TaskEngineState, taskEngine engine.TaskEngine) error { +func (agent *ecsAgent) startEBSWatcher( + state dockerstate.TaskEngineState, + taskEngine engine.TaskEngine, + dockerClient dockerapi.DockerClient, +) { seelog.Debug("Windows EBS Watcher not implemented: No Op") - return nil } // handler implements https://godoc.org/golang.org/x/sys/windows/svc#Handler diff --git a/agent/ebs/watcher.go b/agent/ebs/watcher.go index 2c8aaff13d..5fe069910e 100644 --- a/agent/ebs/watcher.go +++ b/agent/ebs/watcher.go @@ -22,10 +22,14 @@ import ( "time" ecsapi "github.com/aws/amazon-ecs-agent/agent/api" + "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" ecsengine "github.com/aws/amazon-ecs-agent/agent/engine" "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" apiebs "github.com/aws/amazon-ecs-agent/ecs-agent/api/attachment/resource" + "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status" csi "github.com/aws/amazon-ecs-agent/ecs-agent/csiclient" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" md "github.com/aws/amazon-ecs-agent/ecs-agent/manageddaemon" log "github.com/cihub/seelog" @@ -47,13 +51,15 @@ type EBSWatcher struct { csiClient csi.CSIClient scanTicker *time.Ticker // TODO: The dockerTaskEngine.stateChangeEvent will be used to send over the state change event for EBS attachments once it's been found and mounted/resize/format. - taskEngine ecsengine.TaskEngine + taskEngine ecsengine.TaskEngine + dockerClient dockerapi.DockerClient } // NewWatcher is used to return a new instance of the EBSWatcher struct func NewWatcher(ctx context.Context, state dockerstate.TaskEngineState, - taskEngine ecsengine.TaskEngine) *EBSWatcher { + taskEngine ecsengine.TaskEngine, + dockerClient dockerapi.DockerClient) *EBSWatcher { derivedContext, cancel := context.WithCancel(ctx) discoveryClient := apiebs.NewDiscoveryClient(derivedContext) // TODO pull this socket out into config @@ -65,6 +71,7 @@ func NewWatcher(ctx context.Context, discoveryClient: discoveryClient, csiClient: &csiClient, taskEngine: taskEngine, + dockerClient: dockerClient, } } @@ -77,13 +84,7 @@ func (w *EBSWatcher) Start() { for { select { case <-w.scanTicker.C: - pendingEBS := w.agentState.GetAllPendingEBSAttachmentsWithKey() - if len(pendingEBS) > 0 { - foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient) - w.overrideDeviceName(foundVolumes) - w.StageAll(foundVolumes) - w.NotifyAttached(foundVolumes) - } + w.tick() case <-w.ctx.Done(): w.scanTicker.Stop() log.Info("EBS Watcher Stopped due to agent stop") @@ -92,6 +93,93 @@ func (w *EBSWatcher) Start() { } } +// Method to handle watcher's tick. +// If there are no pending EBS volume attachments in agent state, then this method is a no-op. +// If there are pending EBS volume attachments in agent state then this method will ensure +// that EBS Managed Daemon is running and then scan the host for the EBS volumes and process +// the ones that are found. +func (w *EBSWatcher) tick() { + pendingEBS := w.agentState.GetAllPendingEBSAttachmentsWithKey() + if len(pendingEBS) <= 0 { + return + } + if !w.daemonRunning() { + log.Info("EBS Managed Daemon is not currently running. Skipping EBS Watcher tick.") + return + } + foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient) + w.overrideDeviceName(foundVolumes) + w.StageAll(foundVolumes) + w.NotifyAttached(foundVolumes) +} + +// Checks if EBS Daemon Task is running and starts a new one it if it's not. +func (w *EBSWatcher) daemonRunning() bool { + csiTask := w.taskEngine.GetDaemonTask(md.EbsCsiDriver) + + // Check if task is running or about to run + if csiTask != nil && csiTask.GetKnownStatus() == status.TaskRunning { + logger.Debug("EBS Managed Daemon is running", logger.Fields{field.TaskID: csiTask.GetID()}) + return true + } + if csiTask != nil && csiTask.GetKnownStatus() < status.TaskRunning { + logger.Debug("EBS Managed Daemon task is pending transitioning to running", logger.Fields{ + field.TaskID: csiTask.GetID(), + field.KnownStatus: csiTask.GetKnownStatus(), + }) + return false + } + + // Task is neither running nor about to run. We need to start a new one. + + if csiTask == nil { + logger.Info("EBS Managed Daemon task has not been initialized. Will start a new one.") + } else { + logger.Info("EBS Managed Daemon task is beyond running state. Will start a new one.", logger.Fields{ + field.TaskID: csiTask.GetID(), + field.KnownStatus: csiTask.GetKnownStatus(), + }) + } + + ebsCsiDaemonManager, ok := w.taskEngine.GetDaemonManagers()[md.EbsCsiDriver] + if !ok { + log.Errorf("EBS Daemon Manager is not Initialized. EBS Task Attach is not supported.") + return false + } + + // Check if Managed Daemon image has been loaded. + imageLoaded, err := ebsCsiDaemonManager.IsLoaded(w.dockerClient) + if !imageLoaded { + logger.Info("Image is not loaded yet so can't start a Managed Daemon task.", logger.Fields{ + "ImageRef": ebsCsiDaemonManager.GetManagedDaemon().GetImageRef(), + field.Error: err, + }) + return false + } + logger.Debug("Managed Daemon image has been loaded", logger.Fields{ + "ImageRef": ebsCsiDaemonManager.GetManagedDaemon().GetImageRef(), + }) + + // Create a new Managed Daemon task. + csiTask, err = ebsCsiDaemonManager.CreateDaemonTask() + if err != nil { + // Failed to create the task. There is nothing that the watcher can do at this time + // so swallow the error and try again later. + logger.Error("Failed to create EBS Managed Daemon task.", logger.Fields{field.Error: err}) + return false + } + + // Add the new task to task engine. + w.taskEngine.SetDaemonTask(md.EbsCsiDriver, csiTask) + w.taskEngine.AddTask(csiTask) + logger.Info("Added EBS Managed Daemon task to task engine", logger.Fields{ + field.TaskID: csiTask.GetID(), + }) + + // Task is not confirmed to be running yet, so return false. + return false +} + // Stop will stop the EBS watcher func (w *EBSWatcher) Stop() { log.Info("Stopping EBS watcher.") @@ -107,8 +195,7 @@ func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) { // HandleResourceAttachment processes the resource attachment message. It will: // 1. Check whether we already have this attachment in state and if so it's a noop. -// 2. Start the EBS CSI driver if it's not already running -// 3. Otherwise add the attachment to state, start its ack timer, and save to the agent state. +// 2. Otherwise add the attachment to state, start its ack timer, and save to the agent state. func (w *EBSWatcher) HandleEBSResourceAttachment(ebs *apiebs.ResourceAttachment) error { attachmentType := ebs.GetAttachmentType() if attachmentType != apiebs.EBSTaskAttach { @@ -125,29 +212,6 @@ func (w *EBSWatcher) HandleEBSResourceAttachment(ebs *apiebs.ResourceAttachment) }) } - // start EBS CSI Driver Managed Daemon - // We want to avoid creating a new CSI driver task if there's already one that's not been stopped. - if runningCsiTask := w.taskEngine.GetDaemonTask(md.EbsCsiDriver); runningCsiTask != nil && !runningCsiTask.GetKnownStatus().Terminal() { - log.Debugf("engine ebs CSI driver is running with taskID: %v", runningCsiTask.GetID()) - } else { - if ebsCsiDaemonManager, ok := w.taskEngine.GetDaemonManagers()[md.EbsCsiDriver]; ok { - if csiTask, err := ebsCsiDaemonManager.CreateDaemonTask(); err != nil { - // fail attachment and return - log.Errorf("Unable to start ebsCsiDaemon in the engine: error: %s", err) - if csiTask != nil { - log.Errorf("CSI task Error task ID: %s", csiTask.GetID()) - } - return err - } else { - w.taskEngine.SetDaemonTask(md.EbsCsiDriver, csiTask) - w.taskEngine.AddTask(csiTask) - log.Infof("task_engine: Added EBS CSI task to engine") - } - } else { - log.Errorf("CSI Driver is not Initialized") - } - } - if err := w.addEBSAttachmentToState(ebs); err != nil { return fmt.Errorf("%w; attach %v message handler: unable to add ebs attachment to engine state: %v", err, attachmentType, ebs.EBSToString()) diff --git a/agent/ebs/watcher_test.go b/agent/ebs/watcher_test.go index f93112cbcb..765ec002c2 100644 --- a/agent/ebs/watcher_test.go +++ b/agent/ebs/watcher_test.go @@ -18,17 +18,20 @@ package ebs import ( "context" + "errors" "fmt" "path/filepath" "sync" "testing" "time" - apitask "github.com/aws/amazon-ecs-agent/agent/api/task" + "github.com/aws/amazon-ecs-agent/agent/api/task" + mock_dockerapi "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi/mocks" "github.com/aws/amazon-ecs-agent/agent/engine" - dm "github.com/aws/amazon-ecs-agent/agent/engine/daemonmanager" - mock_dm "github.com/aws/amazon-ecs-agent/agent/engine/daemonmanager/mock" + "github.com/aws/amazon-ecs-agent/agent/engine/daemonmanager" + mock_daemonmanager "github.com/aws/amazon-ecs-agent/agent/engine/daemonmanager/mock" "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate" + mock_dockerstate "github.com/aws/amazon-ecs-agent/agent/engine/dockerstate/mocks" mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" statechange "github.com/aws/amazon-ecs-agent/agent/statechange" taskresourcevolume "github.com/aws/amazon-ecs-agent/agent/taskresource/volume" @@ -36,7 +39,7 @@ import ( "github.com/aws/amazon-ecs-agent/ecs-agent/api/attachment" apiebs "github.com/aws/amazon-ecs-agent/ecs-agent/api/attachment/resource" mock_ebs_discovery "github.com/aws/amazon-ecs-agent/ecs-agent/api/attachment/resource/mocks" - apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status" + "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status" csi "github.com/aws/amazon-ecs-agent/ecs-agent/csiclient" mock_csiclient "github.com/aws/amazon-ecs-agent/ecs-agent/csiclient/mocks" md "github.com/aws/amazon-ecs-agent/ecs-agent/manageddaemon" @@ -151,7 +154,6 @@ func TestHandleEBSAttachmentHappyCase(t *testing.T) { // TestHandleExpiredEBSAttachment tests acknowledging an expired resource attachment of type Elastic Block Stores // The resource attachment object should not be saved to the agent state since the expiration date is in the past. func TestHandleExpiredEBSAttachment(t *testing.T) { - t.Skip() mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -159,9 +161,6 @@ func TestHandleExpiredEBSAttachment(t *testing.T) { taskEngineState := dockerstate.NewTaskEngineState() mockDiscoveryClient := mock_ebs_discovery.NewMockEBSDiscovery(mockCtrl) mockTaskEngine := mock_engine.NewMockTaskEngine(mockCtrl) - mockTaskEngine.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(nil).AnyTimes() - mockTaskEngine.EXPECT().GetDaemonManagers().Return(nil).AnyTimes() - mockCsiClient := mock_csiclient.NewMockCSIClient(mockCtrl) testAttachmentProperties := map[string]string{ @@ -310,8 +309,6 @@ func TestHandleInvalidTypeEBSAttachment(t *testing.T) { taskEngineState := dockerstate.NewTaskEngineState() mockDiscoveryClient := mock_ebs_discovery.NewMockEBSDiscovery(mockCtrl) mockTaskEngine := mock_engine.NewMockTaskEngine(mockCtrl) - mockTaskEngine.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(nil).AnyTimes() - mockTaskEngine.EXPECT().GetDaemonManagers().Return(nil).AnyTimes() mockCsiClient := mock_csiclient.NewMockCSIClient(mockCtrl) testAttachmentProperties := map[string]string{ @@ -357,8 +354,6 @@ func TestHandleEBSAckTimeout(t *testing.T) { taskEngineState := dockerstate.NewTaskEngineState() mockDiscoveryClient := mock_ebs_discovery.NewMockEBSDiscovery(mockCtrl) mockTaskEngine := mock_engine.NewMockTaskEngine(mockCtrl) - mockTaskEngine.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(nil).AnyTimes() - mockTaskEngine.EXPECT().GetDaemonManagers().Return(nil).AnyTimes() mockCsiClient := mock_csiclient.NewMockCSIClient(mockCtrl) testAttachmentProperties := map[string]string{ @@ -394,7 +389,6 @@ func TestHandleEBSAckTimeout(t *testing.T) { // TestHandleMismatchEBSAttachment tests handling an EBS attachment but found a different volume attached // onto the host during the scanning process. func TestHandleMismatchEBSAttachment(t *testing.T) { - t.Skip() mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -402,8 +396,6 @@ func TestHandleMismatchEBSAttachment(t *testing.T) { taskEngineState := dockerstate.NewTaskEngineState() mockDiscoveryClient := mock_ebs_discovery.NewMockEBSDiscovery(mockCtrl) mockTaskEngine := mock_engine.NewMockTaskEngine(mockCtrl) - mockTaskEngine.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(nil).AnyTimes() - mockTaskEngine.EXPECT().GetDaemonManagers().Return(nil).AnyTimes() mockCsiClient := mock_csiclient.NewMockCSIClient(mockCtrl) watcher := newTestEBSWatcher(ctx, taskEngineState, mockDiscoveryClient, mockTaskEngine, mockCsiClient) @@ -462,9 +454,9 @@ func TestHandleEBSAttachmentWithExistingCSIDriverTask(t *testing.T) { taskEngineState := dockerstate.NewTaskEngineState() mockDiscoveryClient := mock_ebs_discovery.NewMockEBSDiscovery(mockCtrl) mockTaskEngine := mock_engine.NewMockTaskEngine(mockCtrl) - mockTaskEngine.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(&apitask.Task{ + mockTaskEngine.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(&task.Task{ Arn: "arn:aws:ecs:us-east-1:012345678910:task/some-task-id", - KnownStatusUnsafe: apitaskstatus.TaskRunning, + KnownStatusUnsafe: status.TaskRunning, }).AnyTimes() mockTaskEngine.EXPECT().StateChangeEvents().Return(make(chan statechange.Event)).AnyTimes() @@ -547,17 +539,17 @@ func TestHandleEBSAttachmentWithStoppedCSIDriverTask(t *testing.T) { taskEngineState := dockerstate.NewTaskEngineState() mockDiscoveryClient := mock_ebs_discovery.NewMockEBSDiscovery(mockCtrl) mockTaskEngine := mock_engine.NewMockTaskEngine(mockCtrl) - mockTaskEngine.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(&apitask.Task{ + mockTaskEngine.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(&task.Task{ Arn: "arn:aws:ecs:us-east-1:012345678910:task/some-task-id", - KnownStatusUnsafe: apitaskstatus.TaskStopped, + KnownStatusUnsafe: status.TaskStopped, }).AnyTimes() - mockDaemonManager := mock_dm.NewMockDaemonManager(mockCtrl) - mockDaemonManager.EXPECT().CreateDaemonTask().Return(&apitask.Task{ + mockDaemonManager := mock_daemonmanager.NewMockDaemonManager(mockCtrl) + mockDaemonManager.EXPECT().CreateDaemonTask().Return(&task.Task{ Arn: "arn:aws:ecs:us-east-1:012345678910:task/some-task-id", - KnownStatusUnsafe: apitaskstatus.TaskCreated, + KnownStatusUnsafe: status.TaskCreated, }, nil).AnyTimes() - daemonManagers := map[string]dm.DaemonManager{ + daemonManagers := map[string]daemonmanager.DaemonManager{ md.EbsCsiDriver: mockDaemonManager, } @@ -635,3 +627,193 @@ func TestHandleEBSAttachmentWithStoppedCSIDriverTask(t *testing.T) { } // TODO add StageAll test + +func TestDaemonRunning(t *testing.T) { + tcs := []struct { + name string + setTaskEngineMocks func(*gomock.Controller, *mock_engine.MockTaskEngine) + expected bool + }{ + { + name: "task is running", + setTaskEngineMocks: func(ctrl *gomock.Controller, mte *mock_engine.MockTaskEngine) { + task := &task.Task{ + KnownStatusUnsafe: status.TaskRunning, + Arn: "arn:aws:ecs:us-west-2:1234:task/test/sometaskid", + } + mte.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(task) + }, + expected: true, + }, + { + name: "task is created", + setTaskEngineMocks: func(ctrl *gomock.Controller, mte *mock_engine.MockTaskEngine) { + task := &task.Task{ + KnownStatusUnsafe: status.TaskCreated, + Arn: "arn:aws:ecs:us-west-2:1234:task/test/sometaskid", + } + mte.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(task) + }, + expected: false, + }, + { + name: "task is not created, daemon manager not found", + setTaskEngineMocks: func(ctrl *gomock.Controller, mte *mock_engine.MockTaskEngine) { + mte.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(nil) + mte.EXPECT().GetDaemonManagers().Return(map[string]daemonmanager.DaemonManager{}) + }, + expected: false, + }, + { + name: "image not loaded yet", + setTaskEngineMocks: func(ctrl *gomock.Controller, mte *mock_engine.MockTaskEngine) { + mte.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(nil) + daemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl) + daemonManager.EXPECT().IsLoaded(gomock.Any()).Return(false, nil) + daemonManager.EXPECT().GetManagedDaemon().Return(md.NewManagedDaemon("name", "tag")) + dms := map[string]daemonmanager.DaemonManager{md.EbsCsiDriver: daemonManager} + mte.EXPECT().GetDaemonManagers().Return(dms) + }, + expected: false, + }, + { + name: "daemon task create failed", + setTaskEngineMocks: func(ctrl *gomock.Controller, mte *mock_engine.MockTaskEngine) { + mte.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(nil) + daemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl) + daemonManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil) + daemonManager.EXPECT().GetManagedDaemon().Return(md.NewManagedDaemon("name", "tag")) + dms := map[string]daemonmanager.DaemonManager{md.EbsCsiDriver: daemonManager} + mte.EXPECT().GetDaemonManagers().Return(dms) + daemonManager.EXPECT().CreateDaemonTask().Return(nil, errors.New("error")) + }, + expected: false, + }, + { + name: "new daemon task created successfully", + setTaskEngineMocks: func(ctrl *gomock.Controller, mte *mock_engine.MockTaskEngine) { + mte.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(nil) + daemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl) + daemonManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil) + daemonManager.EXPECT().GetManagedDaemon().Return(md.NewManagedDaemon("name", "tag")) + dms := map[string]daemonmanager.DaemonManager{md.EbsCsiDriver: daemonManager} + mte.EXPECT().GetDaemonManagers().Return(dms) + csiTask := &task.Task{Arn: "arn:aws:ecs:us-west-2:1234:task/test/sometaskid"} + daemonManager.EXPECT().CreateDaemonTask().Return(csiTask, nil) + mte.EXPECT().SetDaemonTask(md.EbsCsiDriver, csiTask).Return() + mte.EXPECT().AddTask(csiTask).Return() + }, + expected: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + taskEngine := mock_engine.NewMockTaskEngine(ctrl) + tc.setTaskEngineMocks(ctrl, taskEngine) + + watcher := &EBSWatcher{taskEngine: taskEngine} + assert.Equal(t, tc.expected, watcher.daemonRunning()) + }) + } +} + +func TestTick(t *testing.T) { + type testCase struct { + name string + pendingAttachments map[string]*apiebs.ResourceAttachment + setTaskEngineStateExpectations func(*mock_dockerstate.MockTaskEngineState) + setTaskEngineExpectations func(*mock_engine.MockTaskEngine) + setDiscoveryClientExpectations func(*mock_ebs_discovery.MockEBSDiscovery) + assertEBSAttachmentsState func(t *testing.T, atts map[string]*apiebs.ResourceAttachment) + } + + attachmentAlreadySentCase := func() testCase { + attachment := &apiebs.ResourceAttachment{ + AttachmentInfo: attachment.AttachmentInfo{ + Status: attachment.AttachmentAttached, + AttachStatusSent: true, + }, + AttachmentProperties: map[string]string{apiebs.DeviceNameKey: "device-name"}, + } + return testCase{ + name: "daemon running and volume already attached and sent", + pendingAttachments: map[string]*apiebs.ResourceAttachment{"ebs-volume:id": attachment}, + setTaskEngineStateExpectations: func(mtes *mock_dockerstate.MockTaskEngineState) { + mtes.EXPECT().GetEBSByVolumeId("id").Return(attachment, true).Times(3) + }, + setTaskEngineExpectations: func(mte *mock_engine.MockTaskEngine) { + task := &task.Task{KnownStatusUnsafe: status.TaskRunning} + mte.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(task) + }, + setDiscoveryClientExpectations: func(me *mock_ebs_discovery.MockEBSDiscovery) { + me.EXPECT().ConfirmEBSVolumeIsAttached("device-name", "id").Return("actual-name", nil) + }, + assertEBSAttachmentsState: func(t *testing.T, atts map[string]*apiebs.ResourceAttachment) { + attachment, ok := atts["ebs-volume:id"] + require.True(t, ok, "attachment not found") + assert.NoError(t, attachment.GetError()) + assert.True(t, attachment.IsAttached()) + assert.True(t, attachment.IsSent()) + }, + } + } + + tcs := []testCase{ + { + name: "no-op when there are no pending attachments", + pendingAttachments: map[string]*apiebs.ResourceAttachment{}, + }, + { + name: "no-op when daemon has been initialized but pending to run", + pendingAttachments: map[string]*apiebs.ResourceAttachment{"id": &apiebs.ResourceAttachment{}}, + setTaskEngineExpectations: func(mte *mock_engine.MockTaskEngine) { + task := &task.Task{KnownStatusUnsafe: status.TaskCreated} + mte.EXPECT().GetDaemonTask(md.EbsCsiDriver).Return(task) + }, + assertEBSAttachmentsState: func(t *testing.T, atts map[string]*apiebs.ResourceAttachment) { + attachment, ok := atts["id"] + require.True(t, ok, "attachment not found") + assert.NoError(t, attachment.GetError()) + assert.False(t, attachment.IsAttached()) + assert.False(t, attachment.IsSent()) + }, + }, + attachmentAlreadySentCase(), + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + taskEngineState := mock_dockerstate.NewMockTaskEngineState(ctrl) + taskEngine := mock_engine.NewMockTaskEngine(ctrl) + dockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + discoveryClient := mock_ebs_discovery.NewMockEBSDiscovery(ctrl) + + attachments := tc.pendingAttachments + taskEngineState.EXPECT().GetAllPendingEBSAttachmentsWithKey().Return(attachments) + if tc.setTaskEngineStateExpectations != nil { + tc.setTaskEngineStateExpectations(taskEngineState) + } + if tc.setTaskEngineExpectations != nil { + tc.setTaskEngineExpectations(taskEngine) + } + if tc.setDiscoveryClientExpectations != nil { + tc.setDiscoveryClientExpectations(discoveryClient) + } + + watcher := NewWatcher(context.Background(), taskEngineState, taskEngine, dockerClient) + watcher.discoveryClient = discoveryClient + watcher.tick() + + if tc.assertEBSAttachmentsState != nil { + tc.assertEBSAttachmentsState(t, attachments) + } + }) + } +} diff --git a/agent/engine/daemonmanager/daemon_manager.go b/agent/engine/daemonmanager/daemon_manager.go index a1c98b48a2..ad3ffaf496 100644 --- a/agent/engine/daemonmanager/daemon_manager.go +++ b/agent/engine/daemonmanager/daemon_manager.go @@ -25,6 +25,9 @@ import ( type DaemonManager interface { GetManagedDaemon() *md.ManagedDaemon CreateDaemonTask() (*apitask.Task, error) + // Returns true if the Daemon image is found on this host, false otherwise. + // Error is returned when something goes wrong when looking for the image. + ImageExists() (bool, error) LoadImage(ctx context.Context, dockerClient dockerapi.DockerClient) (*types.ImageInspect, error) IsLoaded(dockerClient dockerapi.DockerClient) (bool, error) } diff --git a/agent/engine/daemonmanager/daemon_manager_linux.go b/agent/engine/daemonmanager/daemon_manager_linux.go index 0829dbdc27..86df0277ba 100644 --- a/agent/engine/daemonmanager/daemon_manager_linux.go +++ b/agent/engine/daemonmanager/daemon_manager_linux.go @@ -171,6 +171,11 @@ func (dm *daemonManager) initDaemonDirectoryMounts(imageName string) error { return nil } +// Returns true if the Daemon image is found on this host, false otherwise. +func (dm *daemonManager) ImageExists() (bool, error) { + return utils.FileExists(dm.managedDaemon.GetImageTarPath()) +} + // LoadImage loads the daemon's latest image func (dm *daemonManager) LoadImage(ctx context.Context, dockerClient dockerapi.DockerClient) (*types.ImageInspect, error) { var loadErr error diff --git a/agent/engine/daemonmanager/daemon_manager_windows.go b/agent/engine/daemonmanager/daemon_manager_windows.go index 4524b8206c..0f027be61d 100644 --- a/agent/engine/daemonmanager/daemon_manager_windows.go +++ b/agent/engine/daemonmanager/daemon_manager_windows.go @@ -37,3 +37,7 @@ func (dm *daemonManager) LoadImage(ctx context.Context, dockerClient dockerapi.D func (dm *daemonManager) IsLoaded(dockerClient dockerapi.DockerClient) (bool, error) { return false, errors.New("daemonmanager.IsLoaded not implemented for Windows") } + +func (dm *daemonManager) ImageExists() (bool, error) { + return false, errors.New("daemonmanager.ImageExists not implemented for Windows") +} diff --git a/agent/engine/daemonmanager/mock/manager.go b/agent/engine/daemonmanager/mock/manager.go index 64660aaca7..4dc7efc59c 100644 --- a/agent/engine/daemonmanager/mock/manager.go +++ b/agent/engine/daemonmanager/mock/manager.go @@ -81,6 +81,21 @@ func (mr *MockDaemonManagerMockRecorder) GetManagedDaemon() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetManagedDaemon", reflect.TypeOf((*MockDaemonManager)(nil).GetManagedDaemon)) } +// ImageExists mocks base method. +func (m *MockDaemonManager) ImageExists() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ImageExists") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ImageExists indicates an expected call of ImageExists. +func (mr *MockDaemonManagerMockRecorder) ImageExists() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImageExists", reflect.TypeOf((*MockDaemonManager)(nil).ImageExists)) +} + // IsLoaded mocks base method. func (m *MockDaemonManager) IsLoaded(arg0 dockerapi.DockerClient) (bool, error) { m.ctrl.T.Helper() diff --git a/agent/engine/docker_image_manager.go b/agent/engine/docker_image_manager.go index 31b981f791..a6841ce797 100644 --- a/agent/engine/docker_image_manager.go +++ b/agent/engine/docker_image_manager.go @@ -113,6 +113,8 @@ func buildImageCleanupExclusionList(cfg *config.Config) []string { } func (imageManager *dockerImageManager) AddImageToCleanUpExclusionList(image string) { + imageManager.updateLock.Lock() + defer imageManager.updateLock.Unlock() imageManager.imageCleanupExclusionList = append(imageManager.imageCleanupExclusionList, image) logger.Info("Image excluded from cleanup", logger.Fields{ field.Image: image, diff --git a/agent/utils/utils.go b/agent/utils/utils.go index 3b7ae0e6f4..fab02eecce 100644 --- a/agent/utils/utils.go +++ b/agent/utils/utils.go @@ -18,9 +18,11 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "io/fs" "io/ioutil" "math" "math/big" + "os" "path/filepath" "reflect" "strconv" @@ -230,3 +232,15 @@ func GetAttachmentId(attachmentArn string) (string, error) { } return fields[len(fields)-1], nil } + +// Checks if a file exists on the provided file path. +func FileExists(filePath string) (bool, error) { + _, err := os.Stat(filePath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return false, nil + } + return false, err + } + return true, nil +} diff --git a/agent/utils/utils_test.go b/agent/utils/utils_test.go index 947aa8aafd..0f3d6b77d9 100644 --- a/agent/utils/utils_test.go +++ b/agent/utils/utils_test.go @@ -18,8 +18,11 @@ package utils import ( "errors" + "fmt" + "os" "sort" "testing" + "time" "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" "github.com/aws/aws-sdk-go/aws" @@ -195,3 +198,17 @@ func TestGetAttachmentId(t *testing.T) { _, err = GetAttachmentId("invalid") assert.Error(t, err) } + +func TestFileExists(t *testing.T) { + t.Run("file is found", func(t *testing.T) { + file, err := os.CreateTemp("", "file_exists_test") + res, err := FileExists(file.Name()) + assert.NoError(t, err) + assert.True(t, res) + }) + t.Run("file is not found", func(t *testing.T) { + res, err := FileExists(fmt.Sprintf("test_file_exists_%d", time.Now().Unix())) + assert.NoError(t, err) + assert.False(t, res) + }) +} diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/logger/field/constants.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/logger/field/constants.go index b3ae496ac6..ed3f732fc4 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/logger/field/constants.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/logger/field/constants.go @@ -40,6 +40,9 @@ const ( ServiceName = "ServiceName" TaskProtection = "TaskProtection" ImageID = "imageID" + ImageName = "imageName" + ImageRef = "imageRef" + ImageTARPath = "imageTARPath" ImageNames = "imageNames" ImageSizeBytes = "imageSizeBytes" ImagePulledAt = "imagePulledAt" diff --git a/ecs-agent/logger/field/constants.go b/ecs-agent/logger/field/constants.go index b3ae496ac6..ed3f732fc4 100644 --- a/ecs-agent/logger/field/constants.go +++ b/ecs-agent/logger/field/constants.go @@ -40,6 +40,9 @@ const ( ServiceName = "ServiceName" TaskProtection = "TaskProtection" ImageID = "imageID" + ImageName = "imageName" + ImageRef = "imageRef" + ImageTARPath = "imageTARPath" ImageNames = "imageNames" ImageSizeBytes = "imageSizeBytes" ImagePulledAt = "imagePulledAt"