Skip to content

Commit

Permalink
Ensure that labels for mesos tasks launched via MesosCommandExecutor
Browse files Browse the repository at this point in the history
are collected as well
  • Loading branch information
sashankreddya committed Aug 2, 2018
1 parent a390d2e commit 2c96ceb
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 12 deletions.
28 changes: 28 additions & 0 deletions container/mesos/client.go
Expand Up @@ -30,6 +30,7 @@ import (

const (
maxRetryAttempts = 3
invalidPID = -1
)

var (
Expand All @@ -43,6 +44,7 @@ type client struct {

type mesosAgentClient interface {
ContainerInfo(id string) (*containerInfo, error)
ContainerPid(id string) (int, error)
}

type containerInfo struct {
Expand Down Expand Up @@ -90,6 +92,32 @@ func (self *client) ContainerInfo(id string) (*containerInfo, error) {
}, nil
}

// Get the Pid of the container
func (self *client) ContainerPid(id string) (int, error) {
var pid int
var err error
err = retry.Retry(
func(attempt uint) error {
c, err := self.ContainerInfo(id)
if err != nil {
return err
}

if c.cntr.ContainerStatus != nil {
pid = int(*c.cntr.ContainerStatus.ExecutorPID)
} else {
err = fmt.Errorf("error fetching Pid")
}
return err
},
strategy.Limit(maxRetryAttempts),
)
if err != nil {
return invalidPID, fmt.Errorf("failed to fetch pid")
}
return pid, err
}

func (self *client) getContainer(id string) (*mContainer, error) {
// Get all containers
cntrs, err := self.getContainers()
Expand Down
17 changes: 17 additions & 0 deletions container/mesos/client_test.go
Expand Up @@ -32,6 +32,23 @@ func (c *FakeMesosAgentClient) ContainerInfo(id string) (*containerInfo, error)
return cInfo, nil
}

func (c *FakeMesosAgentClient) ContainerPid(id string) (int, error) {
if c.err != nil {
return invalidPID, c.err
}
cInfo, ok := c.cntrInfo[id]
if !ok {
return invalidPID, fmt.Errorf("can't locate container %s", id)
}

if cInfo.cntr.ContainerStatus == nil {
return invalidPID, fmt.Errorf("error fetching Pid")
}

pid := int(*cInfo.cntr.ContainerStatus.ExecutorPID)
return pid, nil
}

func fakeMesosAgentClient(cntrInfo map[string]*containerInfo, err error) mesosAgentClient {
return &FakeMesosAgentClient{
err: err,
Expand Down
7 changes: 1 addition & 6 deletions container/mesos/factory.go
Expand Up @@ -107,16 +107,11 @@ func (self *mesosFactory) CanHandleAndAccept(name string) (handle bool, accept b
// Check if the container is known to mesos and it is active.
id := ContainerNameToMesosId(name)

c, err := self.client.ContainerInfo(id)
_, err = self.client.ContainerInfo(id)
if err != nil {
return false, true, fmt.Errorf("error getting running container: %v", err)
}

pid := int(*c.cntr.ContainerStatus.ExecutorPID)
if pid <= 0 {
return false, true, fmt.Errorf("mesos container pid: %d is invalid", pid)
}

return true, true, nil
}

Expand Down
5 changes: 4 additions & 1 deletion container/mesos/handler.go
Expand Up @@ -94,7 +94,10 @@ func newMesosContainerHandler(
}

labels := cinfo.labels
pid := int(*cinfo.cntr.ContainerStatus.ExecutorPID)
pid, err := client.ContainerPid(id)
if err != nil {
return nil, err
}

libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootFs, pid, includedMetrics)

Expand Down
27 changes: 26 additions & 1 deletion container/mesos/mesos_agent.go
Expand Up @@ -62,14 +62,39 @@ func (s *state) GetExecutor(id string) (*mesos.ExecutorInfo, error) {

// GetTask returns a task launched by given executor.
func (s *state) GetTask(exID string) (*mesos.Task, error) {
// Check if task is in Launched Tasks list
for _, t := range s.st.GetTasks.LaunchedTasks {
if t.ExecutorID.Value == exID {
if s.isMatchingTask(&t, exID) {
return &t, nil
}
}

// Check if task is in Queued Tasks list
for _, t := range s.st.GetTasks.QueuedTasks {
if s.isMatchingTask(&t, exID) {
return &t, nil
}
}
return nil, fmt.Errorf("unable to find task matching executor id %s", exID)
}

func (s *state) isMatchingTask(t *mesos.Task, exID string) bool {
// MESOS-9111: For tasks launched through mesos command/default executor, the
// executorID(which is same as the taskID) field is not filled in the TaskInfo object.
// The workaround is compare with taskID field if executorID is empty
if t.ExecutorID != nil {
if t.ExecutorID.Value == exID {
return true
}
} else {
if t.TaskID.Value == exID {
return true
}
}

return false
}

func (s *state) fetchLabelsFromTask(exID string, labels map[string]string) error {
t, err := s.GetTask(exID)
if err != nil {
Expand Down
27 changes: 23 additions & 4 deletions container/mesos/mesos_agent_test.go
Expand Up @@ -47,12 +47,15 @@ func PopulateExecutors(exID string) *agent.Response_GetExecutors {
return execs
}

func PopulateTasks() *agent.Response_GetTasks {
func PopulateTasks(taskID string, exID string) *agent.Response_GetTasks {
tasks := &agent.Response_GetTasks{}
tasks.LaunchedTasks = make([]mesos.Task, 1)

task := mesos.Task{
ExecutorID: &mesos.ExecutorID{Value: "exec-id1"},
TaskID: mesos.TaskID{Value: taskID},
}
if len(exID) > 0 {
task.ExecutorID = &mesos.ExecutorID{Value: exID}
}

task.Resources = make([]mesos.Resource, 1)
Expand Down Expand Up @@ -92,7 +95,23 @@ func TestFetchLabels(t *testing.T) {
agentState: &agent.Response_GetState{
GetFrameworks: PopulateFrameworks("fw-id1"),
GetExecutors: PopulateExecutors("exec-id1"),
GetTasks: PopulateTasks(),
GetTasks: PopulateTasks("task-id1", "exec-id1"),
},
expectedError: nil,
expectedLabels: map[string]string{
framework: "TestFramework",
source: "source1",
schedulerSLA: nonRevocable,
"key1": "value1",
},
},
{
frameworkID: "fw-id1",
executorID: "task-id1",
agentState: &agent.Response_GetState{
GetFrameworks: PopulateFrameworks("fw-id1"),
GetExecutors: PopulateExecutors("task-id1"),
GetTasks: PopulateTasks("task-id1", ""),
},
expectedError: nil,
expectedLabels: map[string]string{
Expand All @@ -108,7 +127,7 @@ func TestFetchLabels(t *testing.T) {
agentState: &agent.Response_GetState{
GetFrameworks: PopulateFrameworks("fw-id1"),
GetExecutors: PopulateExecutors("exec-id1"),
GetTasks: PopulateTasks(),
GetTasks: PopulateTasks("task-id1", "exec-id1"),
},
expectedError: fmt.Errorf("framework ID \"fw-id2\" not found: unable to find framework id fw-id2"),
expectedLabels: map[string]string{},
Expand Down

0 comments on commit 2c96ceb

Please sign in to comment.