Skip to content

Commit

Permalink
Adding EBS watcher implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mye956 committed Aug 25, 2023
1 parent 1ec6a70 commit 5266fda
Show file tree
Hide file tree
Showing 21 changed files with 1,233 additions and 9 deletions.
212 changes: 212 additions & 0 deletions agent/ebs/watcher.go
@@ -0,0 +1,212 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package ebs

import (
"context"
"sync"
"time"

"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
"github.com/aws/amazon-ecs-agent/agent/statechange"
apiebs "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource"
log "github.com/cihub/seelog"
)

type EBSWatcher struct {
ctx context.Context
cancel context.CancelFunc
scanTicker *time.Ticker
agentState dockerstate.TaskEngineState
// TODO: The dataClient will be used to save to agent's data client as well as start the ACK timer. This will be added once the data client functionality have been added
// dataClient data.Client
ebsChangeEvent chan<- statechange.Event
discoveryClient apiebs.EBSDiscovery
mailbox chan func()
}

// NewWatcher is used to return a new instance of the EBSWatcher struct
func NewWatcher(ctx context.Context,
state dockerstate.TaskEngineState,
stateChangeEvents chan<- statechange.Event) (*EBSWatcher, error) {
derivedContext, cancel := context.WithCancel(ctx)
discoveryClient := apiebs.NewDiscoveryClient(derivedContext)
return &EBSWatcher{
ctx: derivedContext,
cancel: cancel,
agentState: state,
ebsChangeEvent: stateChangeEvents,
discoveryClient: discoveryClient,
mailbox: make(chan func(), 100),
}, nil
}

// Start is used to kick off the periodic scanning process of the EBS volume attachments for the EBS watcher.
// If there aren't any initially, the scan ticker will stop.
func (w *EBSWatcher) Start() {
log.Info("Starting EBS watcher.")

w.scanTicker = time.NewTicker(apiebs.ScanPeriod)
if len(w.agentState.GetAllPendingEBSAttachments()) == 0 {
w.scanTicker.Stop()
}

for {
select {
case f := <-w.mailbox:
f()
case <-w.scanTicker.C:
pendingEBS := w.agentState.GetAllPendingEBSAttachmentsWithKey()
foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient)
w.NotifyFound(foundVolumes)
case <-w.ctx.Done():
w.scanTicker.Stop()
log.Info("EBS Watcher Stopped")
return
}
}
}

// Stop will stop the EBS watcher
func (w *EBSWatcher) Stop() {
log.Info("Stopping EBS watcher.")
w.cancel()
}

// HandleResourceAttachment processes the resource attachment message.
func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) error {
var err error
var wg sync.WaitGroup
wg.Add(1)
w.mailbox <- func() {
defer wg.Done()
empty := len(w.agentState.GetAllPendingEBSAttachments()) == 0

err := w.handleEBSAttachment(ebs)
if err != nil {
log.Warnf("Failed to handle resource attachment %v", ebs.String())
}
if empty && len(w.agentState.GetAllPendingEBSAttachments()) == 1 {
w.scanTicker.Stop()
w.scanTicker = time.NewTicker(apiebs.ScanPeriod)
}
}
wg.Wait()
return err
}

// handleEBSAttachment will handle an EBS attachment via the following:
// 1. Check whether we already have this attachment in state, if so, return
// 2. Otherwise add the attachment to state, start its ack timer, and save the state
func (w *EBSWatcher) handleEBSAttachment(ebs *apiebs.ResourceAttachment) error {
if ebs.AttachmentProperties[apiebs.ResourceTypeName] != apiebs.ElasticBlockStorage {
log.Warn("Resource type not Elastic Block Storage. Skip handling resource attachment.")
return nil
}
volumeID := ebs.AttachmentProperties[apiebs.VolumeIdName]
_, ok := w.agentState.GetEBSByVolumeId(volumeID)
if ok {
log.Infof("EBS Volume attachment already exists. Skip handling EBS attachment %v.", ebs.String())
return nil
}

if err := w.addEBSAttachmentToState(ebs); err != nil {
return err
}
return nil
}

func (w *EBSWatcher) NotifyFound(foundVolumes []string) {
for _, volumeId := range foundVolumes {
w.notifyFoundEBS((volumeId))
}
}

// notifyFoundEBS will mark it as found within the agent state
func (w *EBSWatcher) notifyFoundEBS(volumeId string) {
w.mailbox <- func() {
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
if !ok {
log.Warnf("Unable to find EBS volume with volume ID: %v.", volumeId)
return
}

if ebs.HasExpired() {
log.Warnf("EBS status expired, no longer tracking EBS volume: %v.", ebs.String())
return
}

if ebs.IsSent() {
log.Warnf("State change event has already been emitted for EBS volume: %v.", ebs.String())
return
}

if ebs.IsAttached() {
log.Infof("EBS volume: %v, has already been found. Skip notifying EBS volume.", ebs.String())
return
}

ebs.StopAckTimer()
ebs.SetAttachedStatus()

log.Infof("Successfully found attached EBS volume: %v", ebs.String())
if len(w.agentState.GetAllPendingEBSAttachments()) == 0 {
log.Info("No more attachments to scan for. Stopping scan ticker.")
w.scanTicker.Stop()
}
}
}

// RemoveAttachment will stop tracking an EBS attachment
func (w *EBSWatcher) RemoveAttachment(volumeID string) {
w.mailbox <- func() {
w.removeEBSAttachment(volumeID)
}
}

func (w *EBSWatcher) removeEBSAttachment(volumeID string) {
// TODO: Remove the EBS volume from the data client.
w.agentState.RemoveEBSAttachment(volumeID)
if len(w.agentState.GetAllPendingEBSAttachments()) == 0 {
log.Info("No more attachments to scan for. Stopping scan ticker.")
w.scanTicker.Stop()
}
}

// addEBSAttachmentToState adds an EBS attachment to state, and start its ack timer
func (w *EBSWatcher) addEBSAttachmentToState(ebs *apiebs.ResourceAttachment) error {
volumeId := string(ebs.AttachmentProperties[apiebs.VolumeIdName])
err := ebs.StartTimer(func() {
w.handleEBSAckTimeout(volumeId)
})
if err != nil {
return err
}

w.agentState.AddEBSAttachment(ebs)
return nil
}

// handleEBSAckTimeout removes EBS attachment from agent state after the EBS ack timeout
func (w *EBSWatcher) handleEBSAckTimeout(volumeId string) {
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volumeId)
if !ok {
log.Warnf("Ignoring unmanaged EBS attachment volume ID=%s", volumeId)
return
}
if !ebsAttachment.IsSent() {
log.Warnf("Timed out waiting for EBS ack; removing EBS attachment record %v", ebsAttachment.String())
w.RemoveAttachment(volumeId)
}
}

0 comments on commit 5266fda

Please sign in to comment.