/
task.go
133 lines (115 loc) · 3.54 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
// Package logging contains utility functions for ECS logging.
package logging
import (
"fmt"
"io"
"time"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/copilot-cli/internal/pkg/aws/cloudwatchlogs"
"github.com/aws/copilot-cli/internal/pkg/aws/ecs"
"github.com/aws/copilot-cli/internal/pkg/task"
"github.com/aws/copilot-cli/internal/pkg/term/log"
)
const (
numCWLogsCallsPerRound = 10
fmtTaskLogGroupName = "/copilot/%s"
// e.g., copilot-task/python/4f8243e83f8a4bdaa7587fa1eaff2ea3
fmtTaskLogStreamName = "copilot-task/%s/%s"
)
// TasksDescriber describes ECS tasks.
type TasksDescriber interface {
DescribeTasks(cluster string, taskARNs []string) ([]*ecs.Task, error)
}
// TaskClient retrieves the logs of Amazon ECS tasks.
type TaskClient struct {
// Inputs to the task client.
groupName string
tasks []*task.Task
eventsWriter io.Writer
eventsLogger logGetter
taskDescriber TasksDescriber
// Replaced in tests.
sleep func()
}
// NewTaskClient returns a TaskClient that can retrieve logs from the given tasks under the groupName.
func NewTaskClient(sess *session.Session, groupName string, tasks []*task.Task) *TaskClient {
return &TaskClient{
groupName: groupName,
tasks: tasks,
taskDescriber: ecs.New(sess),
eventsLogger: cloudwatchlogs.New(sess),
eventsWriter: log.OutputWriter,
sleep: func() {
time.Sleep(cloudwatchlogs.SleepDuration)
},
}
}
// WriteEventsUntilStopped writes tasks' events to a writer until all tasks have stopped.
func (t *TaskClient) WriteEventsUntilStopped() error {
in := cloudwatchlogs.LogEventsOpts{
LogGroup: fmt.Sprintf(fmtTaskLogGroupName, t.groupName),
}
for {
logStreams, err := t.logStreamNamesFromTasks(t.tasks)
if err != nil {
return err
}
in.LogStreamPrefixFilters = logStreams
for i := 0; i < numCWLogsCallsPerRound; i++ {
logEventsOutput, err := t.eventsLogger.LogEvents(in)
if err != nil {
return fmt.Errorf("get task log events: %w", err)
}
if err := WriteHumanLogs(t.eventsWriter, cwEventsToHumanJSONStringers(logEventsOutput.Events)); err != nil {
return fmt.Errorf("write log event: %w", err)
}
in.StreamLastEventTime = logEventsOutput.StreamLastEventTime
t.sleep()
}
stopped, err := t.allTasksStopped()
if err != nil {
return err
}
if stopped {
return nil
}
}
}
func (t *TaskClient) allTasksStopped() (bool, error) {
taskARNs := make([]string, len(t.tasks))
for idx, task := range t.tasks {
taskARNs[idx] = task.TaskARN
}
// NOTE: all tasks are deployed to the same cluster and there are at least one tasks being deployed
cluster := t.tasks[0].ClusterARN
tasksResp, err := t.taskDescriber.DescribeTasks(cluster, taskARNs)
if err != nil {
return false, fmt.Errorf("describe tasks: %w", err)
}
stopped := true
var runningTasks []*task.Task
for _, t := range tasksResp {
if *t.LastStatus != ecs.DesiredStatusStopped {
stopped = false
runningTasks = append(runningTasks, &task.Task{
ClusterARN: *t.ClusterArn,
TaskARN: *t.TaskArn,
})
}
}
t.tasks = runningTasks
return stopped, nil
}
func (t *TaskClient) logStreamNamesFromTasks(tasks []*task.Task) ([]string, error) {
var logStreamNames []string
for _, task := range tasks {
id, err := ecs.TaskID(task.TaskARN)
if err != nil {
return nil, fmt.Errorf("parse task ID from ARN %s", task.TaskARN)
}
logStreamNames = append(logStreamNames, fmt.Sprintf(fmtTaskLogStreamName, t.groupName, id))
}
return logStreamNames, nil
}