Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model transformer: model reconciliation for agent upgrades #3878

Merged
merged 4 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 16 additions & 3 deletions agent/data/client.go
Expand Up @@ -19,7 +19,9 @@ import (

"github.com/aws/amazon-ecs-agent/agent/api/container"
"github.com/aws/amazon-ecs-agent/agent/api/task"
"github.com/aws/amazon-ecs-agent/agent/data/transformationfunctions"
"github.com/aws/amazon-ecs-agent/agent/engine/image"
"github.com/aws/amazon-ecs-agent/ecs-agent/modeltransformer"
"github.com/aws/amazon-ecs-agent/ecs-agent/netlib/model/networkinterface"
bolt "go.etcd.io/bbolt"
)
Expand All @@ -33,6 +35,7 @@ const (
imagesBucketName = "images"
eniAttachmentsBucketName = "eniattachments"
metadataBucketName = "metadata"
emptyAgentVersionMsg = "No version info available in boltDB. Either this is a fresh instance, or we were using state file to persist data. Transformer not applicable."
)

var (
Expand Down Expand Up @@ -94,7 +97,8 @@ type Client interface {

// client implements the Client interface using boltdb as the backing data store.
type client struct {
db *bolt.DB
db *bolt.DB
transformer *modeltransformer.Transformer
}

// New returns a data client that implements the Client interface with boltdb.
Expand All @@ -115,7 +119,8 @@ func NewWithSetup(dataDir string) (Client, error) {
return setup(dataDir)
}

// setup initiates the boltdb client and makes sure the buckets we use are created.
// setup initiates the boltdb client and makes sure the buckets we use and transformer are created, and
// registers transformation functions to transformer.
func setup(dataDir string) (*client, error) {
db, err := bolt.Open(filepath.Join(dataDir, dbName), dbMode, nil)
err = db.Update(func(tx *bolt.Tx) error {
Expand All @@ -128,11 +133,19 @@ func setup(dataDir string) (*client, error) {

return nil
})

// create transformer
transformer := modeltransformer.NewTransFormer()

// registering task transformation functions
transformationfunctions.RegisterTaskTransformationFunctions(transformer)

if err != nil {
return nil, err
}
return &client{
db: db,
db: db,
transformer: transformer,
}, nil
}

Expand Down
6 changes: 5 additions & 1 deletion agent/data/client_test.go
Expand Up @@ -20,6 +20,8 @@ import (
"path/filepath"
"testing"

"github.com/aws/amazon-ecs-agent/ecs-agent/modeltransformer"

"github.com/stretchr/testify/require"
bolt "go.etcd.io/bbolt"
)
Expand All @@ -28,6 +30,7 @@ func newTestClient(t *testing.T) Client {
testDir := t.TempDir()

testDB, err := bolt.Open(filepath.Join(testDir, dbName), dbMode, nil)
transformer := modeltransformer.NewTransFormer()
require.NoError(t, err)
require.NoError(t, testDB.Update(func(tx *bolt.Tx) error {
for _, b := range buckets {
Expand All @@ -40,7 +43,8 @@ func newTestClient(t *testing.T) Client {
return nil
}))
testClient := &client{
db: testDB,
db: testDB,
transformer: transformer,
}

t.Cleanup(func() {
Expand Down
115 changes: 115 additions & 0 deletions agent/data/models/task_models.go
@@ -0,0 +1,115 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

//lint:file-ignore U1000 Ignore unused fields as some of them are only used by Fargate

package models

import (
"sync"
"time"

apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
"github.com/aws/amazon-ecs-agent/agent/api/serviceconnect"
"github.com/aws/amazon-ecs-agent/agent/api/task"
apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status"
resourcetype "github.com/aws/amazon-ecs-agent/agent/taskresource/types"
nlappmesh "github.com/aws/amazon-ecs-agent/ecs-agent/netlib/model/appmesh"
)

// Task_1_0_0 is "the original model" before model transformer is created.
type Task_1_0_0 struct {
Arn string
id string
Overrides task.TaskOverrides `json:"-"`
Family string
Version string
ServiceName string
Containers []*apicontainer.Container
Associations []task.Association `json:"associations"`
ResourcesMapUnsafe resourcetype.ResourcesMap `json:"resources"`
Volumes []task.TaskVolume `json:"volumes"`
CPU float64 `json:"Cpu,omitempty"`
Memory int64 `json:"Memory,omitempty"`
DesiredStatusUnsafe apitaskstatus.TaskStatus `json:"DesiredStatus"`
KnownStatusUnsafe apitaskstatus.TaskStatus `json:"KnownStatus"`
KnownStatusTimeUnsafe time.Time `json:"KnownTime"`
PullStartedAtUnsafe time.Time `json:"PullStartedAt"`
PullStoppedAtUnsafe time.Time `json:"PullStoppedAt"`
ExecutionStoppedAtUnsafe time.Time `json:"ExecutionStoppedAt"`
SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"`
ExecutionCredentialsID string `json:"executionCredentialsID"`
credentialsID string
credentialsRelativeURIUnsafe string
ENIs task.TaskENIs `json:"ENI"`
AppMesh *nlappmesh.AppMesh
MemoryCPULimitsEnabled bool `json:"MemoryCPULimitsEnabled,omitempty"`
PlatformFields task.PlatformFields `json:"PlatformFields,omitempty"`
terminalReason string
terminalReasonOnce sync.Once
PIDMode string `json:"PidMode,omitempty"`
IPCMode string `json:"IpcMode,omitempty"`
NvidiaRuntime string `json:"NvidiaRuntime,omitempty"`
LocalIPAddressUnsafe string `json:"LocalIPAddress,omitempty"`
LaunchType string `json:"LaunchType,omitempty"`
lock sync.RWMutex
setIdOnce sync.Once
ServiceConnectConfig *serviceconnect.Config `json:"ServiceConnectConfig,omitempty"`
ServiceConnectConnectionDrainingUnsafe bool `json:"ServiceConnectConnectionDraining,omitempty"`
NetworkMode string `json:"NetworkMode,omitempty"`
IsInternal bool `json:"IsInternal,omitempty"`
}

// Task_1_x_0 is an example new model with breaking change. Latest Task_1_x_0 should be the same as current Task model.
// TODO: update this model when introducing first actual transformation function
type Task_1_x_0 struct {
Arn string
id string
Overrides task.TaskOverrides `json:"-"`
Family string
Version string
ServiceName string
Containers []*apicontainer.Container
Associations []task.Association `json:"associations"`
ResourcesMapUnsafe resourcetype.ResourcesMap `json:"resources"`
Volumes []task.TaskVolume `json:"volumes"`
CPU float64 `json:"Cpu,omitempty"`
Memory int64 `json:"Memory,omitempty"`
DesiredStatusUnsafe apitaskstatus.TaskStatus `json:"DesiredStatus"`
KnownStatusUnsafe apitaskstatus.TaskStatus `json:"KnownStatus"`
KnownStatusTimeUnsafe time.Time `json:"KnownTime"`
PullStartedAtUnsafe time.Time `json:"PullStartedAt"`
PullStoppedAtUnsafe time.Time `json:"PullStoppedAt"`
ExecutionStoppedAtUnsafe time.Time `json:"ExecutionStoppedAt"`
SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"`
ExecutionCredentialsID string `json:"executionCredentialsID"`
credentialsID string
credentialsRelativeURIUnsafe string
NetworkInterfaces task.TaskENIs `json:"NetworkInterfaces"`
AppMesh *nlappmesh.AppMesh
MemoryCPULimitsEnabled bool `json:"MemoryCPULimitsEnabled,omitempty"`
PlatformFields task.PlatformFields `json:"PlatformFields,omitempty"`
terminalReason string
terminalReasonOnce sync.Once
PIDMode string `json:"PidMode,omitempty"`
IPCMode string `json:"IpcMode,omitempty"`
NvidiaRuntime string `json:"NvidiaRuntime,omitempty"`
LocalIPAddressUnsafe string `json:"LocalIPAddress,omitempty"`
LaunchType string `json:"LaunchType,omitempty"`
lock sync.RWMutex
setIdOnce sync.Once
ServiceConnectConfig *serviceconnect.Config `json:"ServiceConnectConfig,omitempty"`
ServiceConnectConnectionDrainingUnsafe bool `json:"ServiceConnectConnectionDraining,omitempty"`
NetworkMode string `json:"NetworkMode,omitempty"`
IsInternal bool `json:"IsInternal,omitempty"`
}
16 changes: 15 additions & 1 deletion agent/data/task_client.go
Expand Up @@ -18,6 +18,8 @@ import (

apitask "github.com/aws/amazon-ecs-agent/agent/api/task"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/agent/version"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"

"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -50,7 +52,19 @@ func (c *client) GetTasks() ([]*apitask.Task, error) {
bucket := tx.Bucket([]byte(tasksBucketName))
return walk(bucket, func(id string, data []byte) error {
task := apitask.Task{}
if err := json.Unmarshal(data, &task); err != nil {
// transform the model before loading it to agent state. this is a noop for now.
agentVersionInDB, err := c.GetMetadata(AgentVersionKey)
if err != nil {
logger.Info(emptyAgentVersionMsg)
} else {
if c.transformer.IsUpgrade(version.Version, agentVersionInDB) {
data, err = c.transformer.TransformTask(agentVersionInDB, data)
if err != nil {
return err
}
}
}
if err = json.Unmarshal(data, &task); err != nil {
return err
}
tasks = append(tasks, &task)
Expand Down
69 changes: 69 additions & 0 deletions agent/data/transformationfunctions/tasktf.go
@@ -0,0 +1,69 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package transformationfunctions

import (
"encoding/json"
"fmt"

"github.com/aws/amazon-ecs-agent/agent/data/models"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/modeltransformer"
)

// RegisterTaskTransformationFunctions calls all registerTaskTransformationFunctions<x_y_z> in ascending order.
// (from lower threshold version to higher threshold version) thresholdVersion is the version we introduce a breaking change in.
// All versions below threshold version need to go through that specific transformation function
func RegisterTaskTransformationFunctions(t *modeltransformer.Transformer) {
registerTaskTransformationFunction1_x_0(t)
}

// registerTaskTransformationFunction1_x_0 is a template RegisterTaskTransformation function.
// It registers the transformation functions that translate the task model from models.Task_1_0_0 to models.Task_1_x_0
// Future addition to transformation functions should follow the same pattern. This current performs noop
// TODO: edit this function when introducing first actual transformation function, and add unit test
func registerTaskTransformationFunction1_x_0(t *modeltransformer.Transformer) {
thresholdVersion := "1.0.0" // this assures it never actually gets executed
Realmonia marked this conversation as resolved.
Show resolved Hide resolved
t.AddTaskTransformationFunctions(thresholdVersion, func(dataIn []byte) ([]byte, error) {
logger.Info(fmt.Sprintf("Executing transformation function with threshold %s.", thresholdVersion))
oldModel := models.Task_1_0_0{}
newModel := models.Task_1_x_0{}
var intermediate map[string]interface{}

// Load json to old model (so that we can capture some fields before it is deleted)
err := json.Unmarshal(dataIn, &oldModel)
if err != nil {
return nil, err
}

// Load json to intermediate model to process
err = json.Unmarshal(dataIn, &intermediate)
if err != nil {
return nil, err
}

// Actual process to process
delete(intermediate, "ENIs")
Realmonia marked this conversation as resolved.
Show resolved Hide resolved
modifiedJSON, err := json.Marshal(intermediate)
if err != nil {
return nil, err
}
err = json.Unmarshal(modifiedJSON, &newModel)
newModel.NetworkInterfaces = oldModel.ENIs
dataOut, err := json.Marshal(&newModel)
logger.Info(fmt.Sprintf("Transform associated with version %s finished.", thresholdVersion))
return dataOut, err
})
logger.Info(fmt.Sprintf("Registered transformation function with threshold %s.", thresholdVersion))
}
35 changes: 35 additions & 0 deletions agent/data/transformationfunctions/tasktf_test.go
@@ -0,0 +1,35 @@
//go:build unit
// +build unit

// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package transformationfunctions

import (
"testing"

"github.com/aws/amazon-ecs-agent/ecs-agent/modeltransformer"

"github.com/stretchr/testify/assert"
)

const (
expectedTaskTransformationChainLength = 1
)

func TestRegisterTaskTransformationFunctions(t *testing.T) {
transformer := modeltransformer.NewTransFormer()
RegisterTaskTransformationFunctions(transformer)
assert.Equal(t, expectedTaskTransformationChainLength, transformer.GetNumberOfTransformationFunctions("Task"))
}